markap14 commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2783113669
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1035,6 +1050,120 @@ public void purge() {
resourceClaimManager.purge();
}
+
+ private class TruncateClaims implements Runnable {
+
+ @Override
+ public void run() {
+ final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
+
+ // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+ for (final String container : containerNames) {
+ if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+ if (toTruncate.isEmpty()) {
+ continue;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ // Drain any Truncation Claims from the Resource Claim Manager.
+ // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+ // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+ // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+ while (true) {
+ final List<ContentClaim> toTruncate = new ArrayList<>();
+ resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+ if (toTruncate.isEmpty()) {
+ return;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ private void truncateClaims(final List<ContentClaim> toTruncate, final
Map<String, Boolean> truncationActivationCache) {
+ final Map<String, List<ContentClaim>> claimsSkipped = new
HashMap<>();
+
+ for (final ContentClaim claim : toTruncate) {
+ final String container =
claim.getResourceClaim().getContainer();
+ if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation.", claim, container);
+ claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+ continue;
+ }
+
+ if (claim.isTruncationCandidate()) {
+ truncate(claim);
+ }
+ }
+
+ claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+ }
+
+ private boolean isTruncationActiveForContainer(final String container,
final Map<String, Boolean> activationCache) {
+ // If not archiving data, we consider truncation always active.
+ if (!archiveData) {
+ return true;
+ }
+
+ final Boolean cachedValue = activationCache.get(container);
+ if (cachedValue != null) {
+ return cachedValue;
+ }
+
+ if (!isArchiveClearedOnLastRun(container)) {
+ LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run.", container);
+ activationCache.put(container, false);
+ return false;
+ }
+
+ final long usableSpace;
+ try {
+ usableSpace = getContainerUsableSpace(container);
+ } catch (final IOException ioe) {
+ LOG.warn("Failed to determine usable space for container {}.
Will not truncate claims for this container.", container, ioe);
+ return false;
+ }
+
+ final Long minUsableSpace =
minUsableContainerBytesForArchive.get(container);
+ if (minUsableSpace != null && usableSpace < minUsableSpace) {
+ LOG.debug("Truncate is active for Container {} because usable
space of {} bytes is below the desired threshold of {} bytes.",
+ container, usableSpace, minUsableSpace);
+
+ activationCache.put(container, true);
+ return true;
+ }
+
+ activationCache.put(container, false);
+ return false;
+ }
+
+ private void truncate(final ContentClaim claim) {
Review Comment:
Thanks for reviewing @pvillard31!
In short, no, that should not be possible. The only way we will ever queue
up the `ContentClaim` for truncation is if the FlowFile Repository is synched
to disk (typically on checkpoint but also possible on every commit if fsync
property in `nifi.properties` is set to `true`) and the Content Claim has
`truncationCandidate = true`. So at this point, the FlowFile Repository is the
owner of the Content Claim and no Processor has access to it, and the
Repository determines that there are no longer any references to it. As a
result, we'll only queue up the Content Claim for truncation if there's only 1
referencing FlowFile and that one referencing FlowFile is now being removed. So
no concerns about the claimant count going back up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]