This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d7f2eb7c26 NIFI-10818: Fixed bug in which oldest bin in MergeContent 
was not being evicted in order to make room for a new bin when using a 
correlation attribute. Added additional tests for JoinEnrichment to ensure that 
the changes had no adverse effects on that processor. Found a bug when running 
that test in MockProcessSession so also addressed the bug 
(ConcurrentModificationException).
d7f2eb7c26 is described below

commit d7f2eb7c263aa9511dba5a2f5760f779ff1a12f9
Author: Mark Payne <[email protected]>
AuthorDate: Wed Nov 16 14:35:24 2022 -0500

    NIFI-10818: Fixed bug in which oldest bin in MergeContent was not being 
evicted in order to make room for a new bin when using a correlation attribute. 
Added additional tests for JoinEnrichment to ensure that the changes had no 
adverse effects on that processor. Found a bug when running that test in 
MockProcessSession so also addressed the bug (ConcurrentModificationException).
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #6668
---
 .../org/apache/nifi/util/MockProcessSession.java   |  2 +-
 .../apache/nifi/processor/util/bin/BinFiles.java   | 53 ++++++++++++++----
 .../nifi/processors/standard/JoinEnrichment.java   |  1 +
 .../processors/standard/TestJoinEnrichment.java    | 64 ++++++++++++++++++++++
 4 files changed, 107 insertions(+), 13 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index bb9d40d5b2..2b8326ed50 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -136,7 +136,7 @@ public class MockProcessSession implements ProcessSession {
     }
 
     public void migrate(final ProcessSession newOwner) {
-        migrate(newOwner, (Collection) currentVersions.values());
+        migrate(newOwner, new ArrayList<>((Collection) 
currentVersions.values()));
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index d7a301b3e6..c9d40829dd 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -174,13 +174,13 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
     public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
         final int totalBinCount = binManager.getBinCount() + readyBins.size();
         final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
-        final int flowFilesBinned;
+        final BinningResult binningResult;
 
-        if (totalBinCount < maxBinCount) {
-            flowFilesBinned = binFlowFiles(context, sessionFactory);
-            getLogger().debug("Binned {} FlowFiles", new Object[] 
{flowFilesBinned});
+        if (totalBinCount <= maxBinCount) {
+            binningResult = binFlowFiles(context, sessionFactory);
+            getLogger().debug("Binned {} FlowFiles", 
binningResult.getFlowFilesBinned());
         } else {
-            flowFilesBinned = 0;
+            binningResult = BinningResult.EMPTY;
             getLogger().debug("Will not bin any FlowFiles because {} bins 
already exist;"
                 + "will wait until bins have been emptied before any more are 
created", new Object[] {totalBinCount});
         }
@@ -189,25 +189,28 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
             return;
         }
 
-        final int binsMigrated = migrateBins(context, flowFilesBinned == 0);
+        final int binsMigrated = migrateBins(context, 
binningResult.getFlowFilesBinned() == 0, binningResult.isNewBinNeeded());
         final int binsProcessed = processBins(context, sessionFactory);
         //If we accomplished nothing then let's yield
-        if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
+        if (binningResult.getFlowFilesBinned() == 0 && binsMigrated == 0 && 
binsProcessed == 0) {
             context.yield();
         }
     }
 
-    private int migrateBins(final ProcessContext context, final boolean 
relaxFullnessConstraint) {
+    private int migrateBins(final ProcessContext context, final boolean 
relaxFullnessConstraint, final boolean newBinNeeded) {
         int added = 0;
         for (final Bin bin : 
binManager.removeReadyBins(relaxFullnessConstraint)) {
             this.readyBins.add(bin);
             added++;
         }
 
-        // if we have created all of the bins that are allowed, go ahead and 
remove the oldest one. If we don't do
+        // Evict the oldest bin if we were not able to evict any based on size 
(added = 0) and either we need a new bin,
+        // or we've already created too many. If we don't do
         // this, then we will simply wait for it to expire because we can't 
get any more FlowFiles into the
         // bins. So we may as well expire it now.
-        if (added == 0 && binManager.getBinCount() > 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
+        final int currentBinCount = binManager.getBinCount();
+        final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+        if (added == 0 && ((currentBinCount > maxBinCount) || (currentBinCount 
== maxBinCount && newBinNeeded))) {
             final Bin bin = binManager.removeOldestBin();
             if (bin != null) {
                 added++;
@@ -261,11 +264,12 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
         return processedBins;
     }
 
-    private int binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+    private BinningResult binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
         int flowFilesBinned = 0;
 
         final ProcessSession session = sessionFactory.createSession();
         final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+        boolean newBinNeeded = false;
         while (binManager.getBinCount() <= maxBinCount) {
             if (!isScheduled()) {
                 break;
@@ -292,16 +296,21 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
 
             for (final Map.Entry<String, List<FlowFile>> entry : 
flowFileGroups.entrySet()) {
                 final Set<FlowFile> unbinned = 
binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
+                if (!unbinned.isEmpty()) {
+                    newBinNeeded = true;
+                }
+
                 for (final FlowFile flowFile : unbinned) {
                     Bin bin = new Bin(sessionFactory.createSession(), 0, 
Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
                     bin.offer(flowFile, session);
                     this.readyBins.add(bin);
                 }
+
                 flowFilesBinned += entry.getValue().size();
             }
         }
 
-        return flowFilesBinned;
+        return new BinningResult(flowFilesBinned, newBinNeeded);
     }
 
     @OnScheduled
@@ -384,4 +393,24 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
 
         return problems;
     }
+
+    private static class BinningResult {
+        private final int flowFilesBinned;
+        private final boolean newBinNeeded;
+
+        public BinningResult(final int flowFilesBinned, final boolean 
newBinNeeded) {
+            this.flowFilesBinned = flowFilesBinned;
+            this.newBinNeeded = newBinNeeded;
+        }
+
+        public int getFlowFilesBinned() {
+            return flowFilesBinned;
+        }
+
+        public boolean isNewBinNeeded() {
+            return newBinNeeded;
+        }
+
+        public static BinningResult EMPTY = new BinningResult(0, false);
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
index 73dc768927..34c04f9635 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
@@ -172,6 +172,7 @@ public class JoinEnrichment extends BinFiles {
             "does not point to any existing field in the original Record, the 
enrichment will not be inserted.")
         .required(true)
         .addValidator(new RecordPathValidator())
+        .defaultValue("/")
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .dependsOn(JOIN_STRATEGY, JOIN_INSERT_ENRICHMENT_FIELDS)
         .build();
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
index 0f24c99255..000c2051f8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
@@ -41,6 +41,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -50,10 +51,73 @@ import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestJoinEnrichment {
     private static final File EXAMPLES_DIR = new 
File("src/test/resources/TestJoinEnrichment");
 
+    @Test
+    public void testManyQueued() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
JoinEnrichment());
+
+        final ArrayListRecordWriter writer = setupCsvServices(runner);
+        runner.setProperty(JoinEnrichment.JOIN_STRATEGY, 
JoinEnrichment.JOIN_SQL);
+        runner.setProperty(JoinEnrichment.SQL, "SELECT original.i, 
original.lower_letter, enrichment.upper_letter FROM original JOIN enrichment ON 
original.i = enrichment.i");
+
+        // Enqueue a flowfile where i=0, lower_letter=a; another with i=1, 
lower_letter=b; etc. up to i=25, lower_letter=z
+        for (int i=0; i < 26; i++) {
+            final Map<String, String> originalAttributes = new HashMap<>();
+            originalAttributes.put("enrichment.group.id", String.valueOf(i));
+            originalAttributes.put("enrichment.role", "ORIGINAL");
+
+            final char letter = (char) ('a' + i);
+
+            runner.enqueue("i,lower_letter\n" + i + "," + letter, 
originalAttributes);
+        }
+
+        // Enqueue a flowfile where i=0, upper_letter=A; another with i=1, 
upper_letter=B; etc. up to i=25, upper_letter=Z
+        for (int i=0; i < 26; i++) {
+            final Map<String, String> enrichmentAttributes = new HashMap<>();
+            enrichmentAttributes.put("enrichment.group.id", String.valueOf(i));
+            enrichmentAttributes.put("enrichment.role", "ENRICHMENT");
+
+            final char letter = (char) ('A' + i);
+            runner.enqueue("i,upper_letter\n" + i + "," + letter, 
enrichmentAttributes);
+        }
+
+        runner.run();
+
+        // Ensure that the result is i=0,lower_letter=a,upper_letter=A ... 
i=25,lower_letter=z,upper_letter=Z
+        runner.assertTransferCount(JoinEnrichment.REL_JOINED, 26);
+        runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 52);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(26, written.size());
+
+        final BitSet found = new BitSet();
+        for (final Record outRecord : written) {
+            final RecordSchema schema = outRecord.getSchema();
+            assertEquals(RecordFieldType.STRING, 
schema.getField("i").get().getDataType().getFieldType());
+            assertEquals(RecordFieldType.STRING, 
schema.getField("lower_letter").get().getDataType().getFieldType());
+            assertEquals(RecordFieldType.STRING, 
schema.getField("upper_letter").get().getDataType().getFieldType());
+
+            final int id = outRecord.getAsInt("i");
+
+            final String expectedLower = "" + ((char) ('a' + id));
+            assertEquals(expectedLower, outRecord.getValue("lower_letter"));
+
+            final String expectedUpper = "" + ((char) ('A' + id));
+            assertEquals(expectedUpper, outRecord.getValue("upper_letter"));
+
+            assertEquals(outRecord.getAsString("lower_letter"), 
outRecord.getAsString("upper_letter").toLowerCase());
+
+            found.set(id);
+        }
+
+        for (int i=0; i < 26; i++) {
+            assertTrue(found.get(i));
+        }
+    }
 
     @Test
     public void testSimpleSqlJoin() throws InitializationException {

Reply via email to