This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 60cd8a0a696 NIFI-15570 Track Content and Truncate large Resource
Claims in FileSystemRepository (#10874)
60cd8a0a696 is described below
commit 60cd8a0a69613ce6fbe1aab8a7f64177670fd881
Author: Mark Payne <[email protected]>
AuthorDate: Mon Mar 16 09:36:02 2026 -0400
NIFI-15570 Track Content and Truncate large Resource Claims in
FileSystemRepository (#10874)
Keep track of Content Claims where the last Claim in a Resource Claim can
be truncated if it is large. Whenever FlowFile Repository is checkpointed,
truncate any large Resource Claims when possible and necessary to avoid having
a situtation where a small FlowFile in a given Resource Claim prevents a large
Content Claim from being cleaned up.
Signed-off-by: David Handermann <[email protected]>
---
.../controller/repository/claim/ContentClaim.java | 8 +
.../repository/claim/ResourceClaimManager.java | 21 ++
.../repository/FileSystemRepository.java | 227 +++++++++++++++-
.../repository/WriteAheadFlowFileRepository.java | 137 +++++++---
.../nifi/controller/TestFileSystemSwapManager.java | 9 +
.../repository/TestFileSystemRepository.java | 287 +++++++++++++++++++--
.../TestWriteAheadFlowFileRepository.java | 283 ++++++++++++++++++++
.../repository/claim/StandardContentClaim.java | 10 +
.../claim/StandardResourceClaimManager.java | 31 +++
.../claim/TestStandardResourceClaimManager.java | 51 ++++
.../repository/ByteArrayContentRepository.java | 5 +
.../tests/system/GenerateTruncatableFlowFiles.java | 115 +++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../ContentClaimTruncationAfterRestartIT.java | 163 ++++++++++++
.../repositories/ContentClaimTruncationIT.java | 153 +++++++++++
15 files changed, 1442 insertions(+), 59 deletions(-)
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
index 5c1d76bebbe..54745f9d28f 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -44,4 +44,12 @@ public interface ContentClaim extends
Comparable<ContentClaim> {
* @return the length of this ContentClaim
*/
long getLength();
+
+ /**
+ * Indicates whether or not this ContentClaim is a candidate for
truncation.
+ * @return true if this ContentClaim is a candidate for truncation, false
otherwise
+ */
+ default boolean isTruncationCandidate() {
+ return false;
+ }
}
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
index 4c68383d86b..4a54d371a1f 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -112,6 +112,17 @@ public interface ResourceClaimManager {
*/
void markDestructable(ResourceClaim claim);
+ /**
+ * Indicates that the Resource Claim associated with the given Content
Claim can now be
+ * truncated to the start of the ContentClaim. This should only ever be
called after it is
+ * guaranteed that the FlowFile Repository has been synchronized with its
underlying
+ * storage component for the same reason as described in the {@link
#markDestructable(ResourceClaim)}
+ * method.
+ *
+ * @param claim the ContentClaim that should be used for truncation
+ */
+ void markTruncatable(ContentClaim claim);
+
/**
* Drains up to {@code maxElements} Content Claims from the internal queue
* of destructable content claims to the given {@code destination} so that
@@ -138,6 +149,16 @@ public interface ResourceClaimManager {
*/
void drainDestructableClaims(Collection<ResourceClaim> destination, int
maxElements, long timeout, TimeUnit unit);
+ /**
+ * Drains up to {@code maxElements} Content Claims from the internal queue
+ * of truncatable content claims to the given {@code destination} so that
+ * they can be truncated.
+ *
+ * @param destination to drain to
+ * @param maxElements max items to drain
+ */
+ void drainTruncatableClaims(Collection<ContentClaim> destination, int
maxElements);
+
/**
* Clears the manager's memory of any and all ResourceClaims that it knows
* about
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 15ec9e78cdd..8987b6464f2 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -47,8 +47,10 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.channels.FileChannel;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
@@ -99,9 +101,12 @@ public class FileSystemRepository implements
ContentRepository {
private final List<String> containerNames;
private final AtomicLong index;
- private final ScheduledExecutorService executor = new FlowEngine(4,
"FileSystemRepository Workers", true);
+ // Executor handles: BinDestructableClaims, one
ArchiveOrDestroyDestructableClaims per content repository container,
+ // TruncateClaims, and archive directory scanning tasks submitted during
initialization.
+ private final ScheduledExecutorService executor = new FlowEngine(6,
"FileSystemRepository Workers", true);
private final ConcurrentMap<String, BlockingQueue<ResourceClaim>>
reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new
HashMap<>();
+ private final TruncationClaimManager truncationClaimManager = new
TruncationClaimManager();
// Queue for claims that are kept open for writing. Ideally, this will be
at
// least as large as the number of threads that will be updating the
repository simultaneously but we don't want
@@ -170,12 +175,13 @@ public class FileSystemRepository implements
ContentRepository {
archiveData = true;
if (maxArchiveSize == null) {
- throw new RuntimeException("No value specified for property '"
- +
NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is
enabled. You must configure the max disk usage in order to enable archiving.");
+ throw new RuntimeException("No value specified for property
'%s' but archiving is enabled. You must configure the max disk usage in order
to enable archiving.".formatted(
+ NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE));
}
if
(!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) {
- throw new RuntimeException("Invalid value specified for the '"
+ NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' property. Value must
be in format: <XX>%");
+ throw new RuntimeException("Invalid value specified for the
'%s' property. Value must be in format: <XX>%%".formatted(
+ NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE));
}
} else if ("false".equalsIgnoreCase(enableArchiving)) {
archiveData = false;
@@ -238,14 +244,15 @@ public class FileSystemRepository implements
ContentRepository {
this.resourceClaimManager = context.getResourceClaimManager();
this.eventReporter = context.getEventReporter();
- final Map<String, Path> fileRespositoryPaths =
nifiProperties.getContentRepositoryPaths();
+ final Map<String, Path> fileRepositoryPaths =
nifiProperties.getContentRepositoryPaths();
executor.scheduleWithFixedDelay(new BinDestructableClaims(), 1, 1,
TimeUnit.SECONDS);
- for (int i = 0; i < fileRespositoryPaths.size(); i++) {
+ for (int i = 0; i < fileRepositoryPaths.size(); i++) {
executor.scheduleWithFixedDelay(new
ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS);
}
final long cleanupMillis =
this.determineCleanupInterval(nifiProperties);
+ executor.scheduleWithFixedDelay(new TruncateClaims(), cleanupMillis,
cleanupMillis, TimeUnit.MILLISECONDS);
for (final Map.Entry<String, Path> containerEntry :
containers.entrySet()) {
final String containerName = containerEntry.getKey();
@@ -689,7 +696,16 @@ public class FileSystemRepository implements
ContentRepository {
@Override
public int incrementClaimaintCount(final ContentClaim claim) {
- return incrementClaimantCount(claim == null ? null :
claim.getResourceClaim(), false);
+ if (claim == null) {
+ return 0;
+ }
+
+ if (claim.isTruncationCandidate() && claim instanceof final
StandardContentClaim scc) {
+ LOG.debug("{} is a truncation candidate, but is being claimed
again. Setting truncation candidate to false", claim);
+ scc.setTruncationCandidate(false);
+ }
+
+ return incrementClaimantCount(claim.getResourceClaim(), false);
}
protected int incrementClaimantCount(final ResourceClaim resourceClaim,
final boolean newClaim) {
@@ -741,6 +757,7 @@ public class FileSystemRepository implements
ContentRepository {
}
}
+ truncationClaimManager.removeTruncationClaims(claim);
return true;
}
@@ -1032,6 +1049,122 @@ public class FileSystemRepository implements
ContentRepository {
resourceClaimManager.purge();
}
+ private class TruncateClaims implements Runnable {
+
+ @Override
+ public void run() {
+ final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
+
+ // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+ for (final String container : containerNames) {
+ if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+ if (toTruncate.isEmpty()) {
+ continue;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ // Drain any Truncation Claims from the Resource Claim Manager.
+ // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+ // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+ // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+ // Loop to drain the entire queue in a single invocation rather
than waiting for the next scheduled run. Because the default
+ // interval is 1 minute, waiting for the next run could delay
truncation on a disk that is already under pressure and increases
+ // the risk of having too many claims that the queue overflows (in
which case we would lose some optimization).
+ while (true) {
+ final List<ContentClaim> toTruncate = new ArrayList<>();
+ resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+ if (toTruncate.isEmpty()) {
+ return;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ private void truncateClaims(final List<ContentClaim> toTruncate, final
Map<String, Boolean> truncationActivationCache) {
+ final Map<String, List<ContentClaim>> claimsSkipped = new
HashMap<>();
+
+ for (final ContentClaim claim : toTruncate) {
+ final String container =
claim.getResourceClaim().getContainer();
+ if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation", claim, container);
+ claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+ continue;
+ }
+
+ if (claim.isTruncationCandidate()) {
+ truncate(claim);
+ }
+ }
+
+ claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+ }
+
+ private boolean isTruncationActiveForContainer(final String container,
final Map<String, Boolean> activationCache) {
+ // If not archiving data, we consider truncation always active.
+ if (!archiveData) {
+ return true;
+ }
+
+ final Boolean cachedValue = activationCache.get(container);
+ if (cachedValue != null) {
+ return cachedValue;
+ }
+
+ if (!isArchiveClearedOnLastRun(container)) {
+ LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run", container);
+ activationCache.put(container, false);
+ return false;
+ }
+
+ final long usableSpace;
+ try {
+ usableSpace = getContainerUsableSpace(container);
+ } catch (final IOException ioe) {
+ LOG.warn("Failed to determine usable space for container {}.
Will not truncate claims for this container", container, ioe);
+ return false;
+ }
+
+ final Long minUsableSpace =
minUsableContainerBytesForArchive.get(container);
+ if (minUsableSpace != null && usableSpace < minUsableSpace) {
+ LOG.debug("Truncate is active for Container {} because usable
space of {} bytes is below the desired threshold of {} bytes.",
+ container, usableSpace, minUsableSpace);
+
+ activationCache.put(container, true);
+ return true;
+ }
+
+ activationCache.put(container, false);
+ return false;
+ }
+
+ private void truncate(final ContentClaim claim) {
+ LOG.info("Truncating {} to {} bytes because the FlowFile occupying
the last {} bytes has been removed",
+ claim.getResourceClaim(), claim.getOffset(),
claim.getLength());
+
+ final Path path = getPath(claim);
+ if (path == null) {
+ LOG.warn("Cannot truncate {} because the file cannot be
found", claim);
+ return;
+ }
+
+ try (final FileChannel fileChannel = FileChannel.open(path,
StandardOpenOption.WRITE)) {
+ fileChannel.truncate(claim.getOffset());
+ } catch (final NoSuchFileException nsfe) {
+ // This is unlikely but can occur if the claim was truncatable
and the underlying Resource Claim becomes
+ // destructable. In this case, we may archive or delete the
entire ResourceClaim. This is safe to ignore,
+ // since it means the data is cleaned up anyway.
+ LOG.debug("Failed to truncate {} because file [{}] does not
exist", claim, path, nsfe);
+ } catch (final IOException e) {
+ LOG.warn("Failed to truncate {} to {} bytes", claim,
claim.getOffset(), e);
+ }
+ }
+ }
+
private class BinDestructableClaims implements Runnable {
@Override
@@ -1120,6 +1253,11 @@ public class FileSystemRepository implements
ContentRepository {
final boolean archived = archive(curPath);
LOG.debug("Successfully moved {} to archive", claim);
+
+ if (archived) {
+ truncationClaimManager.removeTruncationClaims(claim);
+ }
+
return archived;
}
@@ -1392,7 +1530,7 @@ public class FileSystemRepository implements
ContentRepository {
if (notYetExceedingThreshold.isEmpty()) {
oldestContainerArchive = System.currentTimeMillis();
} else {
- oldestContainerArchive =
notYetExceedingThreshold.get(0).getLastModTime();
+ oldestContainerArchive =
notYetExceedingThreshold.getFirst().getLastModTime();
}
// Queue up the files in the order that they should be destroyed so
that we don't have to scan the directories for a while.
@@ -1400,10 +1538,11 @@ public class FileSystemRepository implements
ContentRepository {
fileQueue.offer(toEnqueue);
}
+
containerState.setArchiveClearedOnLastRun(notYetExceedingThreshold.isEmpty());
+
final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS)
- deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis;
LOG.debug("Oldest Archive Date for Container {} is {}; delete expired
= {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms",
containerName, new Date(oldestContainerArchive),
deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
- return;
}
private class ArchiveOrDestroyDestructableClaims implements Runnable {
@@ -1543,6 +1682,7 @@ public class FileSystemRepository implements
ContentRepository {
private volatile long bytesUsed = 0L;
private volatile long checkUsedCutoffTimestamp = 0L;
+ private volatile boolean archiveClearedOnLastRun = false;
public ContainerState(final String containerName, final boolean
archiveEnabled, final long backPressureBytes, final long capacity) {
this.containerName = containerName;
@@ -1661,6 +1801,24 @@ public class FileSystemRepository implements
ContentRepository {
public void decrementArchiveCount() {
archivedFileCount.decrementAndGet();
}
+
+ public void setArchiveClearedOnLastRun(final boolean
archiveClearedOnLastRun) {
+ this.archiveClearedOnLastRun = archiveClearedOnLastRun;
+ }
+
+ public boolean isArchiveClearedOnLastRun() {
+ return archiveClearedOnLastRun;
+ }
+ }
+
+ // Visible for testing
+ protected boolean isArchiveClearedOnLastRun(final String containerName) {
+ final ContainerState containerState =
containerStateMap.get(containerName);
+ if (containerState == null) {
+ return false;
+ }
+
+ return containerState.isArchiveClearedOnLastRun();
}
protected static class ClaimLengthPair {
@@ -1882,19 +2040,27 @@ public class FileSystemRepository implements
ContentRepository {
// Mark the claim as no longer being able to be written to
resourceClaimManager.freeze(scc.getResourceClaim());
+ // If the content claim length is large (> 1 MB or the max
appendable claim length),
+ // mark the claim as a truncation candidate
+ final boolean largeClaim = scc.getLength() >
Math.min(1_000_000, maxAppendableClaimLength);
+ final boolean nonStartClaim = scc.getOffset() > 0;
+ if (largeClaim && nonStartClaim) {
+ scc.setTruncationCandidate(true);
+ }
+
// ensure that the claim is no longer on the queue
writableClaimQueue.remove(new
ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
- bcos.close();
- LOG.debug("Claim lenth >= max; Closing {}", this);
+ LOG.debug("Claim length >= max; Closing {}", this);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace: ", new RuntimeException("Stack
Trace for closing " + this));
}
+ bcos.close();
}
}
@Override
- public synchronized ContentClaim newContentClaim() throws IOException {
+ public synchronized ContentClaim newContentClaim() {
scc = new StandardContentClaim(scc.getResourceClaim(),
scc.getOffset() + Math.max(0, scc.getLength()));
initialLength = 0;
bytesWritten = 0L;
@@ -1903,4 +2069,41 @@ public class FileSystemRepository implements
ContentRepository {
}
}
+ private static class TruncationClaimManager {
+ private static final int MAX_THRESHOLD = 100_000;
+ private final Map<String, List<ContentClaim>> truncationClaims = new
HashMap<>();
+
+ synchronized void addTruncationClaims(final String container, final
List<ContentClaim> claim) {
+ final List<ContentClaim> contentClaims =
truncationClaims.computeIfAbsent(container, c -> new ArrayList<>());
+ contentClaims.addAll(claim);
+
+ // If we have too many claims, remove the smallest ones so that we
only have the largest MAX_THRESHOLD claims.
+ if (contentClaims.size() > MAX_THRESHOLD) {
+
contentClaims.sort(Comparator.comparingLong(ContentClaim::getLength).reversed());
+ final List<ContentClaim> discardableClaims =
contentClaims.subList(MAX_THRESHOLD, contentClaims.size());
+ LOG.debug("Truncation Claim Manager has more than {} claims
for container {}; discarding {} claims: {}",
+ MAX_THRESHOLD, container, discardableClaims.size(),
discardableClaims);
+ discardableClaims.clear();
+ }
+ }
+
+ synchronized List<ContentClaim> removeTruncationClaims(final String
container) {
+ final List<ContentClaim> removed =
truncationClaims.remove(container);
+ return removed == null ? Collections.emptyList() : removed;
+ }
+
+ synchronized List<ContentClaim> removeTruncationClaims(final
ResourceClaim resourceClaim) {
+ final List<ContentClaim> contentClaims =
truncationClaims.get(resourceClaim.getContainer());
+ if (contentClaims == null) {
+ return Collections.emptyList();
+ }
+
+ final List<ContentClaim> claimsToRemove = contentClaims.stream()
+ .filter(cc -> cc.getResourceClaim().equals(resourceClaim))
+ .toList();
+
+ contentClaims.removeAll(claimsToRemove);
+ return claimsToRemove;
+ }
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index a04b7527917..b886c062b2e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -21,7 +21,9 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.repository.schema.FieldCache;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@@ -98,6 +100,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
private final List<File> flowFileRepositoryPaths = new ArrayList<>();
private final ScheduledExecutorService checkpointExecutor;
private final int maxCharactersToCache;
+ private final long truncationThreshold;
private volatile Collection<SerializedRepositoryRecord> recoveredRecords =
null;
private final Set<ResourceClaim> orphanedResourceClaims =
Collections.synchronizedSet(new HashSet<>());
@@ -132,6 +135,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// before the data is destroyed, it's okay because the data will be
unknown to the Content Repository, so it will be destroyed
// on restart.
private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>>
claimsAwaitingDestruction = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>>
claimsAwaitingTruncation = new ConcurrentHashMap<>();
/**
* default no args constructor for service loading only.
@@ -143,6 +147,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
nifiProperties = null;
retainOrphanedFlowFiles = true;
maxCharactersToCache = 0;
+ truncationThreshold = Long.MAX_VALUE;
}
public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
@@ -153,6 +158,10 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
retainOrphanedFlowFiles = orphanedFlowFileProperty == null ||
Boolean.parseBoolean(orphanedFlowFileProperty);
this.maxCharactersToCache =
nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
+ final long maxAppendableClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
+ // Cap the truncation threshold at 1 MB so that claims larger than 1
MB are always eligible
+ // for truncation regardless of how large maxAppendableClaimSize is
configured.
+ truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);
final String directoryName =
nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
flowFileRepositoryPaths.add(new File(directoryName));
@@ -445,12 +454,13 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// The below code is not entirely thread-safe, but we are OK with that
because the results aren't really harmful.
// Specifically, if two different threads call updateRepository with
DELETE records for the same Content Claim,
// it's quite possible for claimant count to be 0 below, which results
in two different threads adding the Content
- // Claim to the 'claimsAwaitDestruction' map. As a result, we can call
#markDestructable with the same ContentClaim
+ // Claim to the 'claimsAwaitingDestruction' map. As a result, we can
call #markDestructable with the same ContentClaim
// multiple times, and the #markDestructable method is not necessarily
idempotent.
// However, the result of this is that the FileSystem Repository may
end up trying to remove the content multiple times.
// This does not, however, cause problems, as ContentRepository should
handle this
// This does indicate that some refactoring should probably be
performed, though, as this is not a very clean interface.
- final Set<ResourceClaim> claimsToAdd = new HashSet<>();
+ final Set<ResourceClaim> destructableClaims = new HashSet<>();
+ final Set<ContentClaim> truncatableClaims = new HashSet<>();
final Set<String> swapLocationsAdded = new HashSet<>();
final Set<String> swapLocationsRemoved = new HashSet<>();
@@ -458,20 +468,34 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
for (final RepositoryRecord record : repositoryRecords) {
updateClaimCounts(record);
+ final ContentClaim contentClaim = record.getCurrentClaim();
+ final boolean truncationCandidate = contentClaim != null &&
contentClaim.isTruncationCandidate();
+ final boolean claimChanged =
!Objects.equals(record.getOriginalClaim(), contentClaim);
if (record.getType() == RepositoryRecordType.DELETE) {
- // For any DELETE record that we have, if claim is
destructible, mark it so
- if (record.getCurrentClaim() != null &&
isDestructable(record.getCurrentClaim())) {
-
claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
+ // For any DELETE record that we have, if claim is
destructible or truncatable, mark it so
+ if (isDestructable(contentClaim)) {
+ destructableClaims.add(contentClaim.getResourceClaim());
+ } else if (truncationCandidate) {
+ truncatableClaims.add(contentClaim);
}
- // If the original claim is different than the current claim
and the original claim is destructible, mark it so
- if (record.getOriginalClaim() != null &&
!record.getOriginalClaim().equals(record.getCurrentClaim()) &&
isDestructable(record.getOriginalClaim())) {
-
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
+ // If the original claim is different than the current claim
and the original claim is destructible
+ // or truncatable, mark it so
+ if (claimChanged) {
+ if (isDestructable(record.getOriginalClaim())) {
+
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
+ } else if (record.getOriginalClaim() != null &&
record.getOriginalClaim().isTruncationCandidate()) {
+ truncatableClaims.add(record.getOriginalClaim());
+ }
}
} else if (record.getType() == RepositoryRecordType.UPDATE) {
// if we have an update, and the original is no longer needed,
mark original as destructible
- if (record.getOriginalClaim() != null &&
record.getCurrentClaim() != record.getOriginalClaim() &&
isDestructable(record.getOriginalClaim())) {
-
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
+ if (claimChanged) {
+ if (isDestructable(record.getOriginalClaim())) {
+
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
+ } else if (record.getOriginalClaim() != null &&
record.getOriginalClaim().isTruncationCandidate()) {
+ truncatableClaims.add(record.getOriginalClaim());
+ }
}
} else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
final String swapLocation = record.getSwapLocation();
@@ -484,13 +508,16 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
}
- // Once the content claim counts have been updated for all records,
collect any transient claims that are eligible for destruction
+ // Once the content claim counts have been updated for all records,
collect any transient
+ // claims that are eligible for destruction or truncation
for (final RepositoryRecord record : repositoryRecords) {
final List<ContentClaim> transientClaims =
record.getTransientClaims();
if (transientClaims != null) {
for (final ContentClaim transientClaim : transientClaims) {
if (isDestructable(transientClaim)) {
- claimsToAdd.add(transientClaim.getResourceClaim());
+
destructableClaims.add(transientClaim.getResourceClaim());
+ } else if (transientClaim.isTruncationCandidate()) {
+ truncatableClaims.add(transientClaim);
}
}
}
@@ -504,19 +531,15 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
}
- if (!claimsToAdd.isEmpty()) {
- // Get / Register a Set<ContentClaim> for the given Partition Index
- final Integer partitionKey = Integer.valueOf(partitionIndex);
- BlockingQueue<ResourceClaim> claimQueue =
claimsAwaitingDestruction.get(partitionKey);
- if (claimQueue == null) {
- claimQueue = new LinkedBlockingQueue<>();
- final BlockingQueue<ResourceClaim> existingClaimQueue =
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
- if (existingClaimQueue != null) {
- claimQueue = existingClaimQueue;
- }
- }
+ if (!destructableClaims.isEmpty()) {
+ // Get / Register a Set<ResourceClaim> for the given Partition
Index
+ final BlockingQueue<ResourceClaim> claimQueue =
claimsAwaitingDestruction.computeIfAbsent(partitionIndex, key -> new
LinkedBlockingQueue<>());
+ claimQueue.addAll(destructableClaims);
+ }
- claimQueue.addAll(claimsToAdd);
+ if (!truncatableClaims.isEmpty()) {
+ final BlockingQueue<ContentClaim> claimQueue =
claimsAwaitingTruncation.computeIfAbsent(partitionIndex, key -> new
LinkedBlockingQueue<>());
+ claimQueue.addAll(truncatableClaims);
}
}
@@ -566,16 +589,24 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
@Override
public void onSync(final int partitionIndex) {
- final BlockingQueue<ResourceClaim> claimQueue =
claimsAwaitingDestruction.get(partitionIndex);
- if (claimQueue == null) {
- return;
+ final BlockingQueue<ResourceClaim> destructionClaimQueue =
claimsAwaitingDestruction.get(partitionIndex);
+ if (destructionClaimQueue != null) {
+ final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
+ destructionClaimQueue.drainTo(claimsToDestroy);
+
+ for (final ResourceClaim claim : claimsToDestroy) {
+ markDestructable(claim);
+ }
}
- final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
- claimQueue.drainTo(claimsToDestroy);
+ final BlockingQueue<ContentClaim> truncationClaimQueue =
claimsAwaitingTruncation.get(partitionIndex);
+ if (truncationClaimQueue != null) {
+ final Set<ContentClaim> claimsToTruncate = new HashSet<>();
+ truncationClaimQueue.drainTo(claimsToTruncate);
- for (final ResourceClaim claim : claimsToDestroy) {
- markDestructable(claim);
+ for (final ContentClaim claim : claimsToTruncate) {
+ claimManager.markTruncatable(claim);
+ }
}
}
@@ -589,6 +620,15 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
markDestructable(claim);
}
}
+
+ for (final BlockingQueue<ContentClaim> claimQueue :
claimsAwaitingTruncation.values()) {
+ final Set<ContentClaim> claimsToTruncate = new HashSet<>();
+ claimQueue.drainTo(claimsToTruncate);
+
+ for (final ContentClaim claim : claimsToTruncate) {
+ claimManager.markTruncatable(claim);
+ }
+ }
}
/**
@@ -723,6 +763,10 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
queueMap.put(queue.getIdentifier(), queue);
}
+ final Set<StandardContentClaim> truncationEligibleClaims = new
HashSet<>();
+ final Set<ContentClaim> forbiddenTruncationClaims = new HashSet<>();
+ final Map<ResourceClaim, ContentClaim>
latestContentClaimByResourceClaim = new HashMap<>();
+
final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>();
int numFlowFilesMissingQueue = 0;
long maxId = 0;
@@ -748,6 +792,15 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
final ContentClaim claim = record.getContentClaim();
+
+ // Track the latest Content Claim for each Resource Claim so that
we can determine which claims are eligible for truncation.
+ if (claim != null) {
+ final ContentClaim latestContentClaim =
latestContentClaimByResourceClaim.get(claim.getResourceClaim());
+ if (latestContentClaim == null || claim.getOffset() >
latestContentClaim.getOffset()) {
+
latestContentClaimByResourceClaim.put(claim.getResourceClaim(), claim);
+ }
+ }
+
final FlowFileQueue flowFileQueue = queueMap.get(queueId);
final boolean orphaned = flowFileQueue == null;
if (orphaned) {
@@ -777,6 +830,18 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
continue;
} else if (claim != null) {
+ // If the claim exceeds the max appendable claim length on its
own and doesn't start the Resource Claim,
+ // we will consider it to be eligible for truncation. However,
if there are multiple FlowFiles sharing the
+ // same claim, we cannot truncate it because doing so would
affect the other FlowFiles.
+ if (claim.getOffset() > 0 && claim.getLength() >
truncationThreshold && claim instanceof final StandardContentClaim scc) {
+ if (forbiddenTruncationClaims.contains(claim) ||
truncationEligibleClaims.contains(scc)) {
+ truncationEligibleClaims.remove(scc);
+ forbiddenTruncationClaims.add(scc);
+ } else {
+ truncationEligibleClaims.add(scc);
+ }
+ }
+
claimManager.incrementClaimantCount(claim.getResourceClaim());
}
@@ -786,6 +851,14 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
// If recoveredRecords has been populated it need to be nulled out now
because it is no longer useful and can be garbage collected.
recoveredRecords = null;
+ // If any Content Claim was determined to be truncatable, mark it as
such now.
+ for (final StandardContentClaim eligible : truncationEligibleClaims) {
+ final ContentClaim latestForResource =
latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
+ if (Objects.equals(eligible, latestForResource)) {
+ eligible.setTruncationCandidate(true);
+ }
+ }
+
// Set the AtomicLong to 1 more than the max ID so that calls to
#getNextFlowFileSequence() will
// return the appropriate number.
flowFileSequenceGenerator.set(maxId + 1);
@@ -852,7 +925,7 @@ public class WriteAheadFlowFileRepository implements
FlowFileRepository, SyncLis
}
@Override
- public long getMaxFlowFileIdentifier() throws IOException {
+ public long getMaxFlowFileIdentifier() {
// flowFileSequenceGenerator is 1 more than the MAX so that we can
call #getAndIncrement on the AtomicLong
return flowFileSequenceGenerator.get() - 1;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 9e0a1324a48..5a6b9d89bf6 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -21,6 +21,7 @@ import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
@@ -223,6 +224,10 @@ public class TestFileSystemSwapManager {
public void markDestructable(ResourceClaim claim) {
}
+ @Override
+ public void markTruncatable(final ContentClaim claim) {
+ }
+
@Override
public void drainDestructableClaims(Collection<ResourceClaim>
destination, int maxElements) {
}
@@ -231,6 +236,10 @@ public class TestFileSystemSwapManager {
public void drainDestructableClaims(Collection<ResourceClaim>
destination, int maxElements, long timeout, TimeUnit unit) {
}
+ @Override
+ public void drainTruncatableClaims(final Collection<ContentClaim>
destination, final int maxElements) {
+ }
+
@Override
public void purge() {
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 7f0e2a7a9d7..102b927af3e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -59,6 +59,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -81,14 +82,19 @@ public class TestFileSystemRepository {
private Path originalNifiPropertiesFile;
private Path rootFile;
private NiFiProperties nifiProperties;
+ private long maxClaimLength;
@BeforeEach
public void setup() throws IOException {
originalNifiPropertiesFile =
Paths.get("src/test/resources/conf/nifi.properties");
rootFile = tempDir.resolve("content_repository");
final String contentRepositoryDirectory =
NiFiProperties.REPOSITORY_CONTENT_PREFIX.concat("default");
- final Map<String, String> additionalProperties =
Map.of(contentRepositoryDirectory, rootFile.toString());
+ final Map<String, String> additionalProperties = Map.of(
+ contentRepositoryDirectory, rootFile.toString(),
+ NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 sec"
+ );
nifiProperties =
NiFiProperties.createBasicNiFiProperties(originalNifiPropertiesFile.toString(),
additionalProperties);
+ maxClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
repository = new FileSystemRepository(nifiProperties);
claimManager = new StandardResourceClaimManager();
repository.initialize(new
StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
@@ -145,7 +151,6 @@ public class TestFileSystemRepository {
@Timeout(30)
public void testClaimsArchivedWhenMarkedDestructable() throws IOException,
InterruptedException {
final ContentClaim contentClaim = repository.create(false);
- final long configuredAppendableClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
final Map<String, Path> containerPaths =
nifiProperties.getContentRepositoryPaths();
assertEquals(1, containerPaths.size());
final String containerName = containerPaths.keySet().iterator().next();
@@ -154,7 +159,7 @@ public class TestFileSystemRepository {
long bytesWritten = 0L;
final byte[] bytes = "Hello
World".getBytes(StandardCharsets.UTF_8);
- while (bytesWritten <= configuredAppendableClaimLength) {
+ while (bytesWritten <= maxClaimLength) {
out.write(bytes);
bytesWritten += bytes.length;
}
@@ -480,12 +485,9 @@ public class TestFileSystemRepository {
repository.incrementClaimaintCount(claim);
final Path claimPath = getPath(claim);
- final String maxAppendableClaimLength =
nifiProperties.getMaxAppendableClaimSize();
- final int maxClaimLength =
DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue();
-
// Create the file.
try (final OutputStream out = repository.write(claim)) {
- out.write(new byte[maxClaimLength]);
+ out.write(new byte[(int) maxClaimLength]);
}
int count = repository.decrementClaimantCount(claim);
@@ -502,10 +504,14 @@ public class TestFileSystemRepository {
}
private Path getPath(final ContentClaim claim) {
+ return getPath(repository, claim);
+ }
+
+ private Path getPath(final FileSystemRepository repo, final ContentClaim
claim) {
try {
- final Method m =
repository.getClass().getDeclaredMethod("getPath", ContentClaim.class);
+ final Method m =
FileSystemRepository.class.getDeclaredMethod("getPath", ContentClaim.class);
m.setAccessible(true);
- return (Path) m.invoke(repository, claim);
+ return (Path) m.invoke(repo, claim);
} catch (final Exception e) {
throw new RuntimeException("Could not invoke #getPath on
FileSystemRepository due to " + e);
}
@@ -694,9 +700,7 @@ public class TestFileSystemRepository {
// write at least 1 MB to the output stream so that when we close the
output stream
// the repo won't keep the stream open.
- final String maxAppendableClaimLength =
nifiProperties.getMaxAppendableClaimSize();
- final int maxClaimLength =
DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue();
- final byte[] buff = new byte[maxClaimLength];
+ final byte[] buff = new byte[(int) maxClaimLength];
out.write(buff);
out.write(buff);
@@ -897,14 +901,267 @@ public class TestFileSystemRepository {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ // Create a small claim at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim smallClaim = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(smallData);
+ }
+ assertFalse(smallClaim.isTruncationCandidate());
+
+ // Now create a large claim on potentially the same ResourceClaim,
writing more than maxAppendableClaimLength
+ // to freeze the ResourceClaim. Because smallClaim was small and
recycled, largeClaim will be at a non-zero
+ // offset on the same ResourceClaim.
+ final ContentClaim largeClaim = repository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(largeData);
+ }
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
+ // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
+ // then create a new claim that starts at offset 0.
+ // The simplest approach: create claims until we get one at offset 0.
+ ContentClaim offsetZeroClaim = null;
+ for (int i = 0; i < 20; i++) {
+ final ContentClaim candidate = repository.create(false);
+ if (candidate instanceof StandardContentClaim standardContentClaim
&& standardContentClaim.getOffset() == 0) {
+ // Write large data that exceeds maxAppendableClaimLength
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ offsetZeroClaim = candidate;
+ break;
+ } else {
+ // Write large data to exhaust this claim's ResourceClaim
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ }
+ }
+
+ assertNotNull(offsetZeroClaim);
+ assertFalse(offsetZeroClaim.isTruncationCandidate());
+ }
+
+ @Test
+ public void testIncrementClaimantCountClearsTruncationCandidate() throws
IOException {
+ // Create a small claim to start a ResourceClaim, then a large claim
to freeze it
+ final ContentClaim smallClaim = repository.create(false);
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = repository.create(false);
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Simulate a clone by incrementing claimant count
+ repository.incrementClaimaintCount(largeClaim);
+
+ assertFalse(largeClaim.isTruncationCandidate());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateClaimReducesFileSizeAndPreservesEarlierData()
throws IOException, InterruptedException {
+ // We need to create our own repository that overrides
getContainerUsableSpace to simulate disk pressure
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0; // Extreme disk pressure
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Hello World - small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Both claims should share the same resource claim
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ // Get the file path
+ final Path filePath = getPath(localRepository, smallClaim);
+ assertNotNull(filePath);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Decrement claimant count for the large claim to 0 (small claim
still holds a reference)
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+ // Mark the large claim as truncatable
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for the TruncateClaims background task to truncate the
file. Poll the file size until it shrinks.
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still fully readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ final byte[] readData = readFully(in, smallData.length);
+ assertArrayEquals(smallData, readData);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateNotActiveWhenDiskNotPressured() throws
IOException, InterruptedException {
+ // Create repository with ample disk space
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return Long.MAX_VALUE; // Plenty of space
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ Thread.sleep(3000L);
+ assertEquals(originalSize, Files.size(filePath));
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(90)
+ public void testTruncateClaimDeferredThenExecutedWhenPressureStarts()
throws IOException, InterruptedException {
+ // Create a repository where disk pressure can be toggled
+ shutdown();
+
+ final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return usableSpace.get();
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for at least one run of the background task with NO
pressure.
+ // File should NOT be truncated.
+ Thread.sleep(3_000);
+ assertEquals(originalSize, Files.size(filePath));
+
+ // Now turn on disk pressure
+ usableSpace.set(0);
+
+ // Wait for the next background task run to truncate the file
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ assertNotNull(in);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
private byte[] readFully(final InputStream inStream, final int size)
throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+ final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(size);
int len;
final byte[] buffer = new byte[size];
while ((len = inStream.read(buffer)) >= 0) {
- baos.write(buffer, 0, len);
+ outputStream.write(buffer, 0, len);
}
- return baos.toByteArray();
+ return outputStream.toByteArray();
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 9d6118e8ebe..3a6453e3124 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -810,4 +810,287 @@ public class TestWriteAheadFlowFileRepository {
return swapLocation;
}
}
+
+ //
=========================================================================
+ // Truncation Feature: Helpers
+ //
=========================================================================
+
+ /**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+ private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+ }
+
+ private RuntimeRepoContext createRuntimeRepoContext() {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+ queueProvider.addConnection(connection);
+ return new RuntimeRepoContext(claimManager, queueProvider, queue);
+ }
+
+ private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
+ final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
+ claim.setLength(length);
+ if (truncationCandidate) {
+ claim.setTruncationCandidate(true);
+ }
+ return claim;
+ }
+
+ private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws
IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(claim)
+ .build();
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
+ createRecord.setWorking(flowFile, false);
+ createRecord.setDestination(queue);
+ repo.updateRepository(List.of(createRecord));
+
+ final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
+ deleteRecord.markForDelete();
+ repo.updateRepository(List.of(deleteRecord));
+ }
+
+ /**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+ private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims)
throws IOException {
+ final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+ final Connection writeConnection = Mockito.mock(Connection.class);
+ when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+ final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+ when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+ writeQueueProvider.addConnection(writeConnection);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(writeClaimManager);
+ repo.loadFlowFiles(writeQueueProvider);
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ for (int i = 0; i < claims.length; i++) {
+ final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+ .id(i + 1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-" +
String.format("%012d", i + 1))
+ .contentClaim(claims[i])
+ .build();
+ final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
+ rec.setWorking(ff, false);
+ rec.setDestination(writeQueue);
+ records.add(rec);
+ }
+ repo.updateRepository(records);
+ }
+
+ // Recover
+ final List<FlowFileRecord> recovered = new ArrayList<>();
+ final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+ when(recoveryQueue.getIdentifier()).thenReturn("1234");
+ doAnswer(invocation -> {
+ recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+ return null;
+ }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+ final Connection recoveryConnection = Mockito.mock(Connection.class);
+ when(recoveryConnection.getIdentifier()).thenReturn("1234");
+ when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+ final TestQueueProvider recoveryQueueProvider = new
TestQueueProvider();
+ recoveryQueueProvider.addConnection(recoveryConnection);
+
+ try (final WriteAheadFlowFileRepository repo2 = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo2.initialize(new StandardResourceClaimManager());
+ repo2.loadFlowFiles(recoveryQueueProvider);
+ }
+
+ return recovered;
+ }
+
+ private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord>
recovered, final long offset) {
+ return recovered.stream()
+ .filter(ff -> ff.getContentClaim() != null &&
ff.getContentClaim().getOffset() == offset)
+ .findFirst()
+ .orElse(null);
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Runtime Tests
+ //
=========================================================================
+
+ @Test
+ public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue()
throws IOException {
+ final RuntimeRepoContext context = createRuntimeRepoContext();
+ final ResourceClaim resourceClaim =
context.claimManager().newResourceClaim("container", "section", "1", false,
false);
+ context.claimManager().incrementClaimantCount(resourceClaim);
+ context.claimManager().incrementClaimantCount(resourceClaim); // count
= 2 so that after delete decrement it stays > 0 (not destructable)
+ final StandardContentClaim contentClaim = createClaim(resourceClaim,
1024L, 5_000_000L, true);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(context.claimManager());
+ repo.loadFlowFiles(context.queueProvider());
+ createAndDeleteFlowFile(repo, context.queue(), contentClaim);
+ repo.checkpoint();
+ }
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ context.claimManager().drainTruncatableClaims(truncated, 100);
+ assertTrue(truncated.contains(contentClaim));
+ }
+
+ @Test
+ public void testDestructableClaimTakesPriorityOverTruncatable() throws
IOException {
+ final RuntimeRepoContext context = createRuntimeRepoContext();
+ final ResourceClaim resourceClaim =
context.claimManager().newResourceClaim("container", "section", "1", false,
false);
+ context.claimManager().incrementClaimantCount(resourceClaim); // count
= 1 -- will reach 0 after delete
+ final StandardContentClaim contentClaim = createClaim(resourceClaim,
1024L, 5_000_000L, true);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(context.claimManager());
+ repo.loadFlowFiles(context.queueProvider());
+ createAndDeleteFlowFile(repo, context.queue(), contentClaim);
+ repo.checkpoint();
+ }
+
+ final List<ResourceClaim> destructed = new ArrayList<>();
+ context.claimManager().drainDestructableClaims(destructed, 100);
+ assertTrue(destructed.contains(resourceClaim));
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ context.claimManager().drainTruncatableClaims(truncated, 100);
+ assertFalse(truncated.contains(contentClaim));
+ }
+
+ @Test
+ public void testUpdateRecordOriginalClaimQueuedForTruncation() throws
IOException {
+ final RuntimeRepoContext context = createRuntimeRepoContext();
+
+ final ResourceClaim originalResourceClaim =
context.claimManager().newResourceClaim("container", "section", "1", false,
false);
+ context.claimManager().incrementClaimantCount(originalResourceClaim);
+ context.claimManager().incrementClaimantCount(originalResourceClaim);
// count = 2 so it stays > 0 after decrement
+ final StandardContentClaim originalClaim =
createClaim(originalResourceClaim, 2048L, 5_000_000L, true);
+
+ final ResourceClaim newResourceClaim =
context.claimManager().newResourceClaim("container", "section", "2", false,
false);
+ context.claimManager().incrementClaimantCount(newResourceClaim);
+ final StandardContentClaim newClaim = createClaim(newResourceClaim,
0L, 100L, false);
+
+ final FlowFileRecord originalFlowFile = new
StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(originalClaim)
+ .build();
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(context.claimManager());
+ repo.loadFlowFiles(context.queueProvider());
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(context.queue());
+ createRecord.setWorking(originalFlowFile, false);
+ createRecord.setDestination(context.queue());
+ repo.updateRepository(List.of(createRecord));
+
+ final FlowFileRecord updatedFlowFile = new
StandardFlowFileRecord.Builder()
+ .fromFlowFile(originalFlowFile)
+ .contentClaim(newClaim)
+ .build();
+ final StandardRepositoryRecord updateRecord = new
StandardRepositoryRecord(context.queue(), originalFlowFile);
+ updateRecord.setWorking(updatedFlowFile, true);
+ updateRecord.setDestination(context.queue());
+ repo.updateRepository(List.of(updateRecord));
+ repo.checkpoint();
+ }
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ context.claimManager().drainTruncatableClaims(truncated, 100);
+ assertTrue(truncated.contains(originalClaim));
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Recovery Tests
+ //
=========================================================================
+
+ @Test
+ public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws
IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim resourceClaim =
claimManager.newResourceClaim("container", "section", "1", false, false);
+ final StandardContentClaim smallClaim = createClaim(resourceClaim, 0L,
100L, false);
+ final StandardContentClaim largeClaim = createClaim(resourceClaim,
100L, 2_000_000L, false);
+
+ final List<FlowFileRecord> recovered = writeAndRecover(smallClaim,
largeClaim);
+
+ final FlowFileRecord recoveredLargeFlowFile =
findRecoveredByOffset(recovered, 100L);
+ assertNotNull(recoveredLargeFlowFile);
+
assertTrue(recoveredLargeFlowFile.getContentClaim().isTruncationCandidate());
+
+ final FlowFileRecord recoveredSmallFlowFile =
findRecoveredByOffset(recovered, 0L);
+ assertNotNull(recoveredSmallFlowFile);
+
assertFalse(recoveredSmallFlowFile.getContentClaim().isTruncationCandidate());
+ }
+
+ @Test
+ public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim resourceClaim =
claimManager.newResourceClaim("container", "section", "1", false, false);
+ final StandardContentClaim sharedClaim = createClaim(resourceClaim,
100L, 2_000_000L, false);
+
+ // Two FlowFiles sharing the same claim (clone scenario)
+ final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim,
sharedClaim);
+
+ for (final FlowFileRecord flowFile : recovered) {
+ if (flowFile.getContentClaim() != null) {
+
assertFalse(flowFile.getContentClaim().isTruncationCandidate());
+ }
+ }
+ }
+
+ @Test
+ public void testRecoveryOnlyMarksTailClaim() throws IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim resourceClaim =
claimManager.newResourceClaim("container", "section", "1", false, false);
+ final StandardContentClaim nonTailClaim = createClaim(resourceClaim,
100L, 2_000_000L, false);
+ final StandardContentClaim tailClaim = createClaim(resourceClaim,
2_000_100L, 3_000_000L, false);
+
+ final List<FlowFileRecord> recovered = writeAndRecover(nonTailClaim,
tailClaim);
+
+ final FlowFileRecord tailFlowFile = findRecoveredByOffset(recovered,
2_000_100L);
+ assertNotNull(tailFlowFile);
+ assertTrue(tailFlowFile.getContentClaim().isTruncationCandidate());
+
+ final FlowFileRecord nonTailFlowFile =
findRecoveredByOffset(recovered, 100L);
+ assertNotNull(nonTailFlowFile);
+ assertFalse(nonTailFlowFile.getContentClaim().isTruncationCandidate());
+ }
+
+ @Test
+ public void testRecoverySmallClaimAfterLargeDoesNotMarkLarge() throws
IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim resourceClaim =
claimManager.newResourceClaim("container", "section", "1", false, false);
+ final StandardContentClaim firstSmallClaim =
createClaim(resourceClaim, 0L, 100L, false);
+ final StandardContentClaim largeClaim = createClaim(resourceClaim,
100L, 2_000_000L, false);
+ final StandardContentClaim secondSmallClaim =
createClaim(resourceClaim, 2_000_100L, 50L, false);
+
+ final List<FlowFileRecord> recovered =
writeAndRecover(firstSmallClaim, largeClaim, secondSmallClaim);
+
+ final List<FlowFileRecord> flowFilesWithClaims = recovered.stream()
+ .filter(flowFile -> flowFile.getContentClaim() != null)
+ .toList();
+
+ assertFalse(flowFilesWithClaims.isEmpty());
+ for (final FlowFileRecord flowFile : flowFilesWithClaims) {
+ assertFalse(flowFile.getContentClaim().isTruncationCandidate(),
+ "No claim should be a truncation candidate because the
large claim is not the tail; claim offset=" +
flowFile.getContentClaim().getOffset());
+ }
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index 814d3e81642..780a44f3699 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -29,6 +29,7 @@ public final class StandardContentClaim implements
ContentClaim, Comparable<Cont
private final ResourceClaim resourceClaim;
private final long offset;
private volatile long length;
+ private volatile boolean truncationCandidate = false;
public StandardContentClaim(final ResourceClaim resourceClaim, final long
offset) {
this.resourceClaim = resourceClaim;
@@ -40,6 +41,15 @@ public final class StandardContentClaim implements
ContentClaim, Comparable<Cont
this.length = length;
}
+ public void setTruncationCandidate(final boolean candidate) {
+ this.truncationCandidate = candidate;
+ }
+
+ @Override
+ public boolean isTruncationCandidate() {
+ return truncationCandidate;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 95687747487..1e483fc25f3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -32,6 +32,7 @@ public class StandardResourceClaimManager implements
ResourceClaimManager {
private static final Logger logger =
LoggerFactory.getLogger(StandardResourceClaimManager.class);
private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts =
new ConcurrentHashMap<>();
private final BlockingQueue<ResourceClaim> destructableClaims = new
LinkedBlockingQueue<>(50000);
+ private final BlockingQueue<ContentClaim> truncatableClaims = new
LinkedBlockingQueue<>(100000);
@Override
public ResourceClaim newResourceClaim(final String container, final String
section, final String id, final boolean lossTolerant, final boolean writable) {
@@ -161,6 +162,30 @@ public class StandardResourceClaimManager implements
ResourceClaimManager {
}
}
+ @Override
+ public void markTruncatable(final ContentClaim contentClaim) {
+ if (contentClaim == null) {
+ return;
+ }
+
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ synchronized (resourceClaim) {
+ if (isDestructable(resourceClaim)) {
+ return;
+ }
+
+ logger.debug("Marking {} as truncatable", contentClaim);
+ try {
+ if (!truncatableClaims.offer(contentClaim, 1,
TimeUnit.MINUTES)) {
+ logger.info("Unable to mark {} as truncatable because
maximum queue size [{}] reached", truncatableClaims.size(), contentClaim);
+ }
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.debug("Interrupted while marking {} as truncatable",
contentClaim, ie);
+ }
+ }
+ }
+
@Override
public void drainDestructableClaims(final Collection<ResourceClaim>
destination, final int maxElements) {
final int drainedCount = destructableClaims.drainTo(destination,
maxElements);
@@ -179,6 +204,12 @@ public class StandardResourceClaimManager implements
ResourceClaimManager {
}
}
+ @Override
+ public void drainTruncatableClaims(final Collection<ContentClaim>
destination, final int maxElements) {
+ final int drainedCount = truncatableClaims.drainTo(destination,
maxElements);
+ logger.debug("Drained {} truncatable claims to {}", drainedCount,
destination);
+ }
+
@Override
public void purge() {
claimantCounts.clear();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
index 7fb77b2739c..dd36bdd230a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
@@ -21,12 +21,14 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestStandardResourceClaimManager {
@@ -56,4 +58,53 @@ public class TestStandardResourceClaimManager {
manager.drainDestructableClaims(new ArrayList<>(), 1);
assertSame(completedObject, future.get());
}
+
+ @Test
+ public void testMarkTruncatableSkipsDestructableResourceClaim() {
+ final StandardResourceClaimManager manager = new
StandardResourceClaimManager();
+
+ // Create a resource claim with claimant count 0 and mark it
destructable
+ final ResourceClaim rc = manager.newResourceClaim("container",
"section", "id1", false, false);
+ manager.markDestructable(rc);
+
+ // Create a content claim on that resource claim
+ final StandardContentClaim contentClaim = new StandardContentClaim(rc,
0);
+ contentClaim.setLength(1024);
+ contentClaim.setTruncationCandidate(true);
+
+ // markTruncatable should skip this because the resource claim is
already destructable
+ manager.markTruncatable(contentClaim);
+
+ // Drain truncatable claims - should be empty
+ final List<ContentClaim> truncated = new ArrayList<>();
+ manager.drainTruncatableClaims(truncated, 10);
+ assertTrue(truncated.isEmpty(), "Truncatable claims should be empty
because the resource claim is destructable");
+ }
+
+ @Test
+ public void testMarkTruncatableAndDrainRespectsMaxElements() {
+ final StandardResourceClaimManager manager = new
StandardResourceClaimManager();
+
+ // Create 5 truncatable claims, each on a distinct resource claim with
a positive claimant count
+ for (int i = 0; i < 5; i++) {
+ final ResourceClaim rc = manager.newResourceClaim("container",
"section", "id-" + i, false, false);
+ // Give each resource claim a positive claimant count so it's not
destructable
+ manager.incrementClaimantCount(rc);
+
+ final StandardContentClaim cc = new StandardContentClaim(rc, 0);
+ cc.setLength(1024);
+ cc.setTruncationCandidate(true);
+ manager.markTruncatable(cc);
+ }
+
+ // Drain with maxElements=3
+ final List<ContentClaim> batch1 = new ArrayList<>();
+ manager.drainTruncatableClaims(batch1, 3);
+ assertEquals(3, batch1.size(), "First drain should return exactly 3
claims");
+
+ // Drain again - should get remaining 2
+ final List<ContentClaim> batch2 = new ArrayList<>();
+ manager.drainTruncatableClaims(batch2, 10);
+ assertEquals(2, batch2.size(), "Second drain should return the
remaining 2 claims");
+ }
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
index 08c9fd2eca7..4b24f3af045 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
@@ -260,6 +260,11 @@ public class ByteArrayContentRepository implements
ContentRepository {
return resourceClaim.getLength();
}
+ @Override
+ public boolean isTruncationCandidate() {
+ return false;
+ }
+
@Override
public int compareTo(final ContentClaim o) {
return resourceClaim.compareTo(o.getResourceClaim());
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
new file mode 100644
index 00000000000..c00ba93c6ce
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@DefaultSchedule(period = "10 mins")
+public class GenerateTruncatableFlowFiles extends AbstractProcessor {
+
+ static final PropertyDescriptor BATCH_COUNT = new
PropertyDescriptor.Builder()
+ .name("Batch Count")
+ .description("""
+ The maximum number of batches to generate. Each batch produces
10 FlowFiles (9 small + 1 large). \
+ Once this many batches have been generated, no more FlowFiles
will be produced until the processor is stopped and restarted.""")
+ .required(true)
+ .defaultValue("10")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor SMALL_FILE_SIZE = new
PropertyDescriptor.Builder()
+ .name("Small File Size")
+ .description("Size of each small FlowFile in bytes")
+ .required(true)
+ .defaultValue("1 KB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor LARGE_FILE_SIZE = new
PropertyDescriptor.Builder()
+ .name("Large File Size")
+ .description("Size of each large FlowFile in bytes")
+ .required(true)
+ .defaultValue("10 MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor SMALL_FILES_PER_BATCH = new
PropertyDescriptor.Builder()
+ .name("Small Files Per Batch")
+ .description("Number of small FlowFiles to generate per batch")
+ .required(true)
+ .defaultValue("9")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return List.of(BATCH_COUNT,
+ SMALL_FILE_SIZE,
+ LARGE_FILE_SIZE,
+ SMALL_FILES_PER_BATCH);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Set.of(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final int batchCount = context.getProperty(BATCH_COUNT).asInteger();
+ final Random random = new Random();
+ final int smallFileSize =
context.getProperty(SMALL_FILE_SIZE).asDataSize(DataUnit.B).intValue();
+ final int largeFileSize =
context.getProperty(LARGE_FILE_SIZE).asDataSize(DataUnit.B).intValue();
+ final int smallFilesPerBatch =
context.getProperty(SMALL_FILES_PER_BATCH).asInteger();
+
+ for (int batch = 0; batch < batchCount; batch++) {
+ // Generate small FlowFiles with priority = 10 (low priority,
processed last by PriorityAttributePrioritizer)
+ for (int i = 0; i < smallFilesPerBatch; i++) {
+ createFlowFile(session, random, smallFileSize, "10");
+ }
+
+ // Generate one large FlowFile with priority = 1 (high priority,
processed first by PriorityAttributePrioritizer)
+ createFlowFile(session, random, largeFileSize, "1");
+ }
+ }
+
+ private void createFlowFile(final ProcessSession session, final Random
random, final int fileSize, final String priority) {
+ FlowFile flowFile = session.create();
+ flowFile = session.putAttribute(flowFile, "priority", priority);
+ final byte[] data = new byte[fileSize];
+ random.nextBytes(data);
+ flowFile = session.write(flowFile, out -> out.write(data));
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a12f954cb84..d4a9b4c81cf 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -29,6 +29,7 @@ org.apache.nifi.processors.tests.system.FakeProcessor
org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
org.apache.nifi.processors.tests.system.GenerateAndCountCallbacks
org.apache.nifi.processors.tests.system.GenerateFlowFile
+org.apache.nifi.processors.tests.system.GenerateTruncatableFlowFiles
org.apache.nifi.processors.tests.system.HoldInput
org.apache.nifi.processors.tests.system.IngestFile
org.apache.nifi.processors.tests.system.LoopFlowFile
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
new file mode 100644
index 00000000000..012739ca872
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.repositories;
+
+import org.apache.nifi.tests.system.NiFiInstance;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System test that verifies the truncation feature works correctly after a
NiFi restart.
+ * <p>
+ * During the first run, NiFi is configured with very conservative truncation
settings (99% archive
+ * usage threshold), so truncation never activates. FlowFiles are generated
but not deleted.
+ * </p>
+ * <p>
+ * NiFi is then stopped, reconfigured with aggressive truncation settings (1%
archive usage threshold),
+ * and restarted. On recovery, {@code
WriteAheadFlowFileRepository.restoreFlowFiles()} re-derives
+ * truncation candidates by analyzing the recovered FlowFiles' ContentClaims.
After restart, the large
+ * FlowFiles are deleted, and the test verifies that the content repository
files are truncated on disk.
+ * </p>
+ */
+public class ContentClaimTruncationAfterRestartIT extends NiFiSystemIT {
+
+ @Override
+ protected Map<String, String> getNifiPropertiesOverrides() {
+ // Phase 1: Conservative settings — truncation should NOT occur
+ final Map<String, String> overrides = new HashMap<>();
+ overrides.put("nifi.flowfile.repository.checkpoint.interval", "1 sec");
+ overrides.put("nifi.content.claim.max.appendable.size", "50 KB");
+ // Very high archive threshold means no disk pressure, so truncation
never activates
+ overrides.put("nifi.content.repository.archive.max.usage.percentage",
"99%");
+ overrides.put("nifi.content.repository.archive.cleanup.frequency", "1
sec");
+ return overrides;
+ }
+
+ @Override
+ protected boolean isAllowFactoryReuse() {
+ return false;
+ }
+
+ @Test
+ public void testTruncationOccursAfterRestartWithRecoveredCandidates()
throws NiFiClientException, IOException, InterruptedException {
+ // === Phase 1: Generate FlowFiles with conservative settings (no
truncation) ===
+
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateTruncatableFlowFiles");
+ final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
+
+ final Map<String, String> generateProps = Map.of(
+ "Batch Count", "10",
+ "Small File Size", "1 KB",
+ "Large File Size", "10 MB",
+ "Small Files Per Batch", "9"
+ );
+ getClientUtil().updateProcessorProperties(generate, generateProps);
+
+ ConnectionEntity connection =
getClientUtil().createConnection(generate, terminate, "success");
+ connection = getClientUtil().updateConnectionPrioritizer(connection,
"PriorityAttributePrioritizer");
+ connection = getClientUtil().updateConnectionBackpressure(connection,
10000, 100L * 1024 * 1024);
+
+ // Generate all 100 FlowFiles (90 small @ 1 KB + 10 large @ 10 MB)
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(connection.getId(), 100);
+ getClientUtil().stopProcessor(generate);
+ getClientUtil().waitForStoppedProcessor(generate.getId());
+
+ // Verify the content repository is large — the 10 MB FlowFiles are on
disk
+ final File contentRepoDir = new
File(getNiFiInstance().getInstanceDirectory(), "content_repository");
+ final long thresholdBytes = 1024 * 1024; // 1 MB
+ final long sizeBeforeRestart = getContentRepoSize(contentRepoDir);
+ assertTrue(sizeBeforeRestart > thresholdBytes,
+ "Content repository should be large before restart, but was " +
sizeBeforeRestart + " bytes");
+
+ // === Phase 2: Stop NiFi, reconfigure for aggressive truncation, and
restart ===
+
+ final NiFiInstance nifiInstance = getNiFiInstance();
+ nifiInstance.stop();
+
+ // Switch archive threshold to 1% so truncation activates under disk
pressure
+ nifiInstance.setProperties(Map.of(
+ "nifi.content.repository.archive.max.usage.percentage", "1%"
+ ));
+
+ nifiInstance.start(true);
+
+ // After restart, WriteAheadFlowFileRepository.restoreFlowFiles()
should have re-derived
+ // that the 10 large tail claims are truncation candidates.
+
+ // Run TerminateFlowFile 10 times. Due to
PriorityAttributePrioritizer, the 10 large
+ // FlowFiles (priority=1) are dequeued first.
+ for (int i = 0; i < 10; i++) {
+ final ProcessorEntity terminateAfterRestart =
getNifiClient().getProcessorClient().getProcessor(terminate.getId());
+
getNifiClient().getProcessorClient().runProcessorOnce(terminateAfterRestart);
+
getClientUtil().waitForStoppedProcessor(terminateAfterRestart.getId());
+ }
+
+ waitForQueueCount(connection.getId(), 90);
+
+ // Wait for the content repository files to be truncated.
+ // Before truncation: ~10 files of ~10 MB each = ~100 MB total.
+ // After truncation: ~10 files of ~9 KB each = ~90 KB total.
+ waitFor(() -> {
+ try {
+ return getContentRepoSize(contentRepoDir) < thresholdBytes;
+ } catch (final Exception e) {
+ return false;
+ }
+ });
+
+ final long finalSize = getContentRepoSize(contentRepoDir);
+ assertTrue(finalSize < thresholdBytes,
+ "Content repository total size should be below " + thresholdBytes
+ " bytes after truncation, but was " + finalSize);
+ }
+
+ private long getContentRepoSize(final File dir) {
+ if (dir == null || !dir.exists()) {
+ return 0;
+ }
+
+ final File[] children = dir.listFiles();
+ if (children == null) {
+ return 0L;
+ }
+
+ long total = 0;
+ for (final File child : children) {
+ if (child.isDirectory()) {
+ if (child.getName().equals("archive")) {
+ continue; // Skip archive directories
+ }
+
+ total += getContentRepoSize(child);
+ } else {
+ total += child.length();
+ }
+ }
+
+ return total;
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
new file mode 100644
index 00000000000..c8ae97087ff
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.repositories;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System test that verifies the end-to-end truncation feature. It generates
FlowFiles with a pattern
+ * of 9 small (1 KB) + 1 large (10 MB) per batch, removes only the large
FlowFiles via priority-based
+ * ordering, and then verifies that the content repository files are truncated
on disk.
+ */
+public class ContentClaimTruncationIT extends NiFiSystemIT {
+
+ @Override
+ protected Map<String, String> getNifiPropertiesOverrides() {
+ final Map<String, String> overrides = new HashMap<>();
+ // Use a short checkpoint interval so truncatable claims are flushed
to the ResourceClaimManager promptly
+ overrides.put("nifi.flowfile.repository.checkpoint.interval", "1 sec");
+ overrides.put("nifi.content.repository.archive.cleanup.frequency", "1
sec");
+ // Explicitly set the max appendable claim size (same as system test
default, but explicit for clarity)
+ overrides.put("nifi.content.claim.max.appendable.size", "50 KB");
+ // Set archive threshold extremely low so that truncation occurs
quickly
+ overrides.put("nifi.content.repository.archive.max.usage.percentage",
"1%");
+ return overrides;
+ }
+
+ @Override
+ protected boolean isAllowFactoryReuse() {
+ // Don't reuse the NiFi instance since we override checkpoint interval
+ return false;
+ }
+
+ @Test
+ public void testLargeFlowFileTruncation() throws NiFiClientException,
IOException, InterruptedException {
+ // Create the processors
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateTruncatableFlowFiles");
+ final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
+
+ // Configure GenerateTruncatableFlowFiles with 10 batches (100
FlowFiles total)
+ final Map<String, String> generateProps = Map.of(
+ "Batch Count", "10",
+ "Small File Size", "1 KB",
+ "Large File Size", "10 MB",
+ "Small Files Per Batch", "9"
+ );
+ getClientUtil().updateProcessorProperties(generate, generateProps);
+
+ // Create connection with PriorityAttributePrioritizer and 100 MB
backpressure
+ ConnectionEntity connection =
getClientUtil().createConnection(generate, terminate, "success");
+ connection = getClientUtil().updateConnectionPrioritizer(connection,
"PriorityAttributePrioritizer");
+ connection = getClientUtil().updateConnectionBackpressure(connection,
10000, 100L * 1024 * 1024);
+
+ // Start the generator and wait for 100 FlowFiles to be queued
+ getClientUtil().startProcessor(generate);
+ waitForQueueCount(connection.getId(), 100);
+
+ // Stop the generator
+ getClientUtil().stopProcessor(generate);
+ getClientUtil().waitForStoppedProcessor(generate.getId());
+
+ // Run TerminateFlowFile 10 times. Due to PriorityAttributePrioritizer,
+ // the 10 large FlowFiles (priority=1) will be dequeued first.
+ for (int i = 0; i < 10; i++) {
+ getNifiClient().getProcessorClient().runProcessorOnce(terminate);
+ getClientUtil().waitForStoppedProcessor(terminate.getId());
+ }
+
+ // Wait for 90 FlowFiles remaining (the 10 large ones have been
removed)
+ waitForQueueCount(connection.getId(), 90);
+
+ // Wait for the content repository files to be truncated.
+ // Before truncation: ~10 files of ~10 MB each = ~100 MB total.
+ // After truncation: ~10 files of ~9 KB each = ~90 KB total.
+ // We set a generous threshold of 1 MB.
+ final File contentRepoDir = new
File(getNiFiInstance().getInstanceDirectory(), "content_repository");
+ final long thresholdBytes = 1024 * 1024; // 1 MB
+
+ waitFor(() -> {
+ try {
+ final long totalSize =
getContentRepoSize(contentRepoDir.toPath());
+ return totalSize < thresholdBytes;
+ } catch (final IOException e) {
+ return false;
+ }
+ });
+
+ // Final assertion
+ final long finalSize = getContentRepoSize(contentRepoDir.toPath());
+ assertTrue(finalSize < thresholdBytes,
+ "Content repository total size should be below " +
thresholdBytes + " bytes after truncation, but was " + finalSize);
+ }
+
+ /**
+ * Walks the content repository directory (excluding any "archive"
subdirectories)
+ * and returns the total size of all regular files.
+ */
+ private long getContentRepoSize(final Path contentRepoPath) throws
IOException {
+ if (!Files.exists(contentRepoPath)) {
+ return 0;
+ }
+
+ final AtomicLong totalSize = new AtomicLong(0);
+ Files.walkFileTree(contentRepoPath, new SimpleFileVisitor<>() {
+ @Override
+ public FileVisitResult preVisitDirectory(final Path dir, final
BasicFileAttributes attrs) {
+ // Skip archive directories
+ if (dir.getFileName() != null &&
"archive".equals(dir.getFileName().toString())) {
+ return FileVisitResult.SKIP_SUBTREE;
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFile(final Path file, final
BasicFileAttributes attrs) {
+ totalSize.addAndGet(attrs.size());
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
+ return totalSize.get();
+ }
+}