This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1a37c9ff9a NIFI-14717 Updated MergeContent to create a Bin when Max
Bin Age reached (#10068)
1a37c9ff9a is described below
commit 1a37c9ff9a363dc276a5d3936ae3cbca899571b0
Author: Dominik <[email protected]>
AuthorDate: Tue Jul 8 16:36:00 2025 +0200
NIFI-14717 Updated MergeContent to create a Bin when Max Bin Age reached
(#10068)
- Updated MergeContent to create a Bin when Max Bin Age reached regardless
of whether there are new FlowFiles also queued
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processor/util/bin/BinManager.java | 6 ++---
.../nifi/processors/standard/TestMergeContent.java | 28 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 3 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
index 5e611d29fa..d05bc3fdcd 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
@@ -224,8 +224,8 @@ public class BinManager {
/**
* Finds all bins that are considered full and removes them from the
manager.
* <p/>
- * @param relaxFullnessConstraint if false will require bins to be full
before considered ready; if true bins only have to meet their minimum size
criteria or be 'old' and then they'll be
- * considered ready
+ * @param relaxFullnessConstraint if false will require bins to be full
before considered ready; if true bins only have to meet their minimum size
criteria. It does not affect the 'old' criteria,
+ * so bins that are older than
maxBinAgeSeconds will always be considered ready regardless of this parameter.
* @return bins that are considered full
*/
public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) {
@@ -243,7 +243,7 @@ public class BinManager {
} else if (!relaxFullnessConstraint && bin.isFull()) {
//strict check
bin.setEvictionReason(bin.determineEvictionReason());
readyBins.add(bin);
- } else if (relaxFullnessConstraint &&
bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) {
+ } else if (bin.isOlderThan(maxBinAgeSeconds.get(),
TimeUnit.SECONDS)) {
bin.setEvictionReason(EvictionReason.TIMEOUT);
readyBins.add(bin);
} else { //it isn't time yet...
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 1363645191..3ebc81c95f 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -40,6 +40,7 @@ import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -48,6 +49,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -1201,6 +1203,32 @@ public class TestMergeContent {
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
}
+ @Test
+ @Timeout(value = 5)
+ public void testBinReleasedWhenOnlyMaxBinAgeConditionIsFulfilled() {
+ runner.setProperty(MergeContent.MERGE_STRATEGY,
MergeContent.MergeStrategy.BIN_PACK);
+ runner.setProperty(MergeContent.MAX_BIN_AGE, "2 sec");
+ runner.setProperty(MergeContent.MIN_ENTRIES, "10");
+
+ final Duration pollInterval = Duration.ofSeconds(1);
+ while (true) {
+ try {
+ runner.enqueue(new byte[0]);
+ runner.run(1, false);
+ runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+ break;
+ } catch (final AssertionError | Exception e) {
+ try {
+ // Bin not released yet, wait and check again
+ Thread.sleep(pollInterval.toMillis());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Polling interrupted", ie);
+ }
+ }
+ }
+ }
+
@Test
public void testUniqueAttributes() {
runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY,
AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_UNIQUE);