This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a37c9ff9a NIFI-14717 Updated MergeContent to create a Bin when Max 
Bin Age reached (#10068)
1a37c9ff9a is described below

commit 1a37c9ff9a363dc276a5d3936ae3cbca899571b0
Author: Dominik <[email protected]>
AuthorDate: Tue Jul 8 16:36:00 2025 +0200

    NIFI-14717 Updated MergeContent to create a Bin when Max Bin Age reached 
(#10068)
    
    - Updated MergeContent to create a Bin when Max Bin Age reached regardless 
of whether there are new FlowFiles also queued
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/processor/util/bin/BinManager.java |  6 ++---
 .../nifi/processors/standard/TestMergeContent.java | 28 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
index 5e611d29fa..d05bc3fdcd 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
@@ -224,8 +224,8 @@ public class BinManager {
     /**
      * Finds all bins that are considered full and removes them from the 
manager.
      * <p/>
-     * @param relaxFullnessConstraint if false will require bins to be full 
before considered ready; if true bins only have to meet their minimum size 
criteria or be 'old' and then they'll be
-     * considered ready
+     * @param relaxFullnessConstraint if false will require bins to be full 
before considered ready; if true bins only have to meet their minimum size 
criteria. It does not affect the 'old' criteria,
+     *                                so bins that are older than 
maxBinAgeSeconds will always be considered ready regardless of this parameter.
      * @return bins that are considered full
      */
     public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) {
@@ -243,7 +243,7 @@ public class BinManager {
                     } else if (!relaxFullnessConstraint && bin.isFull()) { 
//strict check
                         bin.setEvictionReason(bin.determineEvictionReason());
                         readyBins.add(bin);
-                    } else if (relaxFullnessConstraint && 
bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) {
+                    } else if (bin.isOlderThan(maxBinAgeSeconds.get(), 
TimeUnit.SECONDS)) {
                         bin.setEvictionReason(EvictionReason.TIMEOUT);
                         readyBins.add(bin);
                     } else { //it isn't time yet...
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 1363645191..3ebc81c95f 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -40,6 +40,7 @@ import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -48,6 +49,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1201,6 +1203,32 @@ public class TestMergeContent {
         runner.assertTransferCount(MergeContent.REL_MERGED, 1);
     }
 
+    @Test
+    @Timeout(value = 5)
+    public void testBinReleasedWhenOnlyMaxBinAgeConditionIsFulfilled() {
+        runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MergeStrategy.BIN_PACK);
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "2 sec");
+        runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+
+        final Duration pollInterval = Duration.ofSeconds(1);
+        while (true) {
+            try {
+                runner.enqueue(new byte[0]);
+                runner.run(1, false);
+                runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+                break;
+            } catch (final AssertionError | Exception e) {
+                try {
+                    // Bin not released yet, wait and check again
+                    Thread.sleep(pollInterval.toMillis());
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Polling interrupted", ie);
+                }
+            }
+        }
+    }
+
     @Test
     public void testUniqueAttributes() {
         runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, 
AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_UNIQUE);

Reply via email to