This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d7f2eb7c26 NIFI-10818: Fixed bug in which oldest bin in MergeContent
was not being evicted in order to make room for a new bin when using a
correlation attribute. Added additional tests for JoinEnrichment to ensure that
the changes had no adverse effects on that processor. Found a bug when running
that test in MockProcessSession so also addressed the bug
(ConcurrentModificationException).
d7f2eb7c26 is described below
commit d7f2eb7c263aa9511dba5a2f5760f779ff1a12f9
Author: Mark Payne <[email protected]>
AuthorDate: Wed Nov 16 14:35:24 2022 -0500
NIFI-10818: Fixed bug in which oldest bin in MergeContent was not being
evicted in order to make room for a new bin when using a correlation attribute.
Added additional tests for JoinEnrichment to ensure that the changes had no
adverse effects on that processor. Found a bug when running that test in
MockProcessSession so also addressed the bug (ConcurrentModificationException).
Signed-off-by: Matthew Burgess <[email protected]>
This closes #6668
---
.../org/apache/nifi/util/MockProcessSession.java | 2 +-
.../apache/nifi/processor/util/bin/BinFiles.java | 53 ++++++++++++++----
.../nifi/processors/standard/JoinEnrichment.java | 1 +
.../processors/standard/TestJoinEnrichment.java | 64 ++++++++++++++++++++++
4 files changed, 107 insertions(+), 13 deletions(-)
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index bb9d40d5b2..2b8326ed50 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -136,7 +136,7 @@ public class MockProcessSession implements ProcessSession {
}
public void migrate(final ProcessSession newOwner) {
- migrate(newOwner, (Collection) currentVersions.values());
+ migrate(newOwner, new ArrayList<>((Collection)
currentVersions.values()));
}
@Override
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index d7a301b3e6..c9d40829dd 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -174,13 +174,13 @@ public abstract class BinFiles extends
AbstractSessionFactoryProcessor {
public final void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
final int totalBinCount = binManager.getBinCount() + readyBins.size();
final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
- final int flowFilesBinned;
+ final BinningResult binningResult;
- if (totalBinCount < maxBinCount) {
- flowFilesBinned = binFlowFiles(context, sessionFactory);
- getLogger().debug("Binned {} FlowFiles", new Object[]
{flowFilesBinned});
+ if (totalBinCount <= maxBinCount) {
+ binningResult = binFlowFiles(context, sessionFactory);
+ getLogger().debug("Binned {} FlowFiles",
binningResult.getFlowFilesBinned());
} else {
- flowFilesBinned = 0;
+ binningResult = BinningResult.EMPTY;
getLogger().debug("Will not bin any FlowFiles because {} bins
already exist;"
+ "will wait until bins have been emptied before any more are
created", new Object[] {totalBinCount});
}
@@ -189,25 +189,28 @@ public abstract class BinFiles extends
AbstractSessionFactoryProcessor {
return;
}
- final int binsMigrated = migrateBins(context, flowFilesBinned == 0);
+ final int binsMigrated = migrateBins(context,
binningResult.getFlowFilesBinned() == 0, binningResult.isNewBinNeeded());
final int binsProcessed = processBins(context, sessionFactory);
//If we accomplished nothing then let's yield
- if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
+ if (binningResult.getFlowFilesBinned() == 0 && binsMigrated == 0 &&
binsProcessed == 0) {
context.yield();
}
}
- private int migrateBins(final ProcessContext context, final boolean
relaxFullnessConstraint) {
+ private int migrateBins(final ProcessContext context, final boolean
relaxFullnessConstraint, final boolean newBinNeeded) {
int added = 0;
for (final Bin bin :
binManager.removeReadyBins(relaxFullnessConstraint)) {
this.readyBins.add(bin);
added++;
}
- // if we have created all of the bins that are allowed, go ahead and
remove the oldest one. If we don't do
+ // Evict the oldest bin if we were not able to evict any based on size
(added = 0) and either we need a new bin,
+ // or we've already created too many. If we don't do
// this, then we will simply wait for it to expire because we can't
get any more FlowFiles into the
// bins. So we may as well expire it now.
- if (added == 0 && binManager.getBinCount() >
context.getProperty(MAX_BIN_COUNT).asInteger()) {
+ final int currentBinCount = binManager.getBinCount();
+ final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+ if (added == 0 && ((currentBinCount > maxBinCount) || (currentBinCount
== maxBinCount && newBinNeeded))) {
final Bin bin = binManager.removeOldestBin();
if (bin != null) {
added++;
@@ -261,11 +264,12 @@ public abstract class BinFiles extends
AbstractSessionFactoryProcessor {
return processedBins;
}
- private int binFlowFiles(final ProcessContext context, final
ProcessSessionFactory sessionFactory) {
+ private BinningResult binFlowFiles(final ProcessContext context, final
ProcessSessionFactory sessionFactory) {
int flowFilesBinned = 0;
final ProcessSession session = sessionFactory.createSession();
final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+ boolean newBinNeeded = false;
while (binManager.getBinCount() <= maxBinCount) {
if (!isScheduled()) {
break;
@@ -292,16 +296,21 @@ public abstract class BinFiles extends
AbstractSessionFactoryProcessor {
for (final Map.Entry<String, List<FlowFile>> entry :
flowFileGroups.entrySet()) {
final Set<FlowFile> unbinned =
binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
+ if (!unbinned.isEmpty()) {
+ newBinNeeded = true;
+ }
+
for (final FlowFile flowFile : unbinned) {
Bin bin = new Bin(sessionFactory.createSession(), 0,
Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
bin.offer(flowFile, session);
this.readyBins.add(bin);
}
+
flowFilesBinned += entry.getValue().size();
}
}
- return flowFilesBinned;
+ return new BinningResult(flowFilesBinned, newBinNeeded);
}
@OnScheduled
@@ -384,4 +393,24 @@ public abstract class BinFiles extends
AbstractSessionFactoryProcessor {
return problems;
}
+
+ private static class BinningResult {
+ private final int flowFilesBinned;
+ private final boolean newBinNeeded;
+
+ public BinningResult(final int flowFilesBinned, final boolean
newBinNeeded) {
+ this.flowFilesBinned = flowFilesBinned;
+ this.newBinNeeded = newBinNeeded;
+ }
+
+ public int getFlowFilesBinned() {
+ return flowFilesBinned;
+ }
+
+ public boolean isNewBinNeeded() {
+ return newBinNeeded;
+ }
+
+ public static BinningResult EMPTY = new BinningResult(0, false);
+ }
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
index 73dc768927..34c04f9635 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java
@@ -172,6 +172,7 @@ public class JoinEnrichment extends BinFiles {
"does not point to any existing field in the original Record, the
enrichment will not be inserted.")
.required(true)
.addValidator(new RecordPathValidator())
+ .defaultValue("/")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(JOIN_STRATEGY, JOIN_INSERT_ENRICHMENT_FIELDS)
.build();
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
index 0f24c99255..000c2051f8 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java
@@ -41,6 +41,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -50,10 +51,73 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestJoinEnrichment {
private static final File EXAMPLES_DIR = new
File("src/test/resources/TestJoinEnrichment");
+ @Test
+ public void testManyQueued() throws InitializationException {
+ final TestRunner runner = TestRunners.newTestRunner(new
JoinEnrichment());
+
+ final ArrayListRecordWriter writer = setupCsvServices(runner);
+ runner.setProperty(JoinEnrichment.JOIN_STRATEGY,
JoinEnrichment.JOIN_SQL);
+ runner.setProperty(JoinEnrichment.SQL, "SELECT original.i,
original.lower_letter, enrichment.upper_letter FROM original JOIN enrichment ON
original.i = enrichment.i");
+
+ // Enqueue a flowfile where i=0, lower_letter=a; another with i=1,
lower_letter=b; etc. up to i=25, lower_letter=z
+ for (int i=0; i < 26; i++) {
+ final Map<String, String> originalAttributes = new HashMap<>();
+ originalAttributes.put("enrichment.group.id", String.valueOf(i));
+ originalAttributes.put("enrichment.role", "ORIGINAL");
+
+ final char letter = (char) ('a' + i);
+
+ runner.enqueue("i,lower_letter\n" + i + "," + letter,
originalAttributes);
+ }
+
+ // Enqueue a flowfile where i=0, upper_letter=A; another with i=1,
upper_letter=B; etc. up to i=25, upper_letter=Z
+ for (int i=0; i < 26; i++) {
+ final Map<String, String> enrichmentAttributes = new HashMap<>();
+ enrichmentAttributes.put("enrichment.group.id", String.valueOf(i));
+ enrichmentAttributes.put("enrichment.role", "ENRICHMENT");
+
+ final char letter = (char) ('A' + i);
+ runner.enqueue("i,upper_letter\n" + i + "," + letter,
enrichmentAttributes);
+ }
+
+ runner.run();
+
+ // Ensure that the result is i=0,lower_letter=a,upper_letter=A ...
i=25,lower_letter=z,upper_letter=Z
+ runner.assertTransferCount(JoinEnrichment.REL_JOINED, 26);
+ runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 52);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(26, written.size());
+
+ final BitSet found = new BitSet();
+ for (final Record outRecord : written) {
+ final RecordSchema schema = outRecord.getSchema();
+ assertEquals(RecordFieldType.STRING,
schema.getField("i").get().getDataType().getFieldType());
+ assertEquals(RecordFieldType.STRING,
schema.getField("lower_letter").get().getDataType().getFieldType());
+ assertEquals(RecordFieldType.STRING,
schema.getField("upper_letter").get().getDataType().getFieldType());
+
+ final int id = outRecord.getAsInt("i");
+
+ final String expectedLower = "" + ((char) ('a' + id));
+ assertEquals(expectedLower, outRecord.getValue("lower_letter"));
+
+ final String expectedUpper = "" + ((char) ('A' + id));
+ assertEquals(expectedUpper, outRecord.getValue("upper_letter"));
+
+ assertEquals(outRecord.getAsString("lower_letter"),
outRecord.getAsString("upper_letter").toLowerCase());
+
+ found.set(id);
+ }
+
+ for (int i=0; i < 26; i++) {
+ assertTrue(found.get(i));
+ }
+ }
@Test
public void testSimpleSqlJoin() throws InitializationException {