This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 86cae18 NIFI-6599 Fix MergeRecord failure in defragment mode
86cae18 is described below
commit 86cae184ff1cfe8e81f7e1d2d393c39c17bca02a
Author: Koji Kawamura <[email protected]>
AuthorDate: Wed Aug 28 13:02:32 2019 +0900
NIFI-6599 Fix MergeRecord failure in defragment mode
Signed-off-by: Pierre Villard <[email protected]>
This closes #3678.
---
.../nifi/processors/standard/MergeRecord.java | 5 +-
.../nifi/processors/standard/merge/RecordBin.java | 95 ++++++++++++----------
.../standard/merge/RecordBinManager.java | 28 +++----
.../standard/merge/RecordBinThresholds.java | 9 ++
.../nifi/processors/standard/TestMergeRecord.java | 40 +++++++++
5 files changed, 119 insertions(+), 58 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 797359e..0c40aa6 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -353,9 +353,10 @@ public class MergeRecord extends
AbstractSessionFactoryProcessor {
session.commit();
}
- // If there is no more data queued up, complete any bin that meets our
minimum threshold
+ // If there is no more data queued up, or strategy is defragment,
complete any bin that meets our minimum threshold
+ // Otherwise, run one more cycle to process queued FlowFiles to add
more fragment into available bins.
int completedBins = 0;
- if (flowFiles.isEmpty()) {
+ if (flowFiles.isEmpty() ||
MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
try {
completedBins += manager.completeFullEnoughBins();
} catch (final Exception e) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index e6ec2e6..c3dbbaa 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -148,6 +148,8 @@ public class RecordBin {
flowFileMigrated = true;
this.flowFiles.add(flowFile);
+
thresholds.getFragmentCountAttribute().ifPresent(this::validateFragmentCount);
+
if (recordCount >= getMinimumRecordCount()) {
// If we have met our minimum record count, we need to flush
so that when we reach the desired number of bytes
// the bin is considered 'full enough'.
@@ -203,10 +205,6 @@ public class RecordBin {
return false;
}
- if(thresholds.getFragmentCountAttribute().isPresent() &&
this.fragmentCount == getMinimumRecordCount()) {
- return true;
- }
-
int maxRecords = thresholds.getMaxRecords();
if (recordCount >= maxRecords) {
@@ -242,6 +240,11 @@ public class RecordBin {
return false;
}
+ if (thresholds.getFragmentCountAttribute().isPresent()) {
+ // Compare with the target fragment count.
+ return this.fragmentCount == thresholds.getFragmentCount();
+ }
+
final int requiredRecordCount = getMinimumRecordCount();
return (recordCount >= requiredRecordCount &&
out.getBytesWritten() >= thresholds.getMinBytes());
} finally {
@@ -301,6 +304,48 @@ public class RecordBin {
}
}
+ /**
+ * Ensure that at least one FlowFile has a fragment.count attribute and
that they all have the same value, if they have a value.
+ */
+ private void validateFragmentCount(String countAttributeName) {
+ Integer expectedFragmentCount = thresholds.getFragmentCount();
+ for (final FlowFile flowFile : flowFiles) {
+ final String countVal = flowFile.getAttribute(countAttributeName);
+ if (countVal == null) {
+ continue;
+ }
+
+ final int count;
+ try {
+ count = Integer.parseInt(countVal);
+ } catch (final NumberFormatException nfe) {
+ logger.error("Could not merge bin with {} FlowFiles because
the '{}' attribute had a value of '{}' for {} but expected a number",
+ new Object[] {flowFiles.size(), countAttributeName,
countVal, flowFile});
+ fail();
+ return;
+ }
+
+ if (expectedFragmentCount != null && count !=
expectedFragmentCount) {
+ logger.error("Could not merge bin with {} FlowFiles because
the '{}' attribute had a value of '{}' for {} but another FlowFile in the bin
had a value of {}",
+ new Object[] {flowFiles.size(), countAttributeName,
countVal, flowFile, expectedFragmentCount});
+ fail();
+ return;
+ }
+
+ if (expectedFragmentCount == null) {
+ expectedFragmentCount = count;
+ thresholds.setFragmentCount(count);
+ }
+ }
+
+ if (expectedFragmentCount == null) {
+ logger.error("Could not merge bin with {} FlowFiles because the
'{}' attribute was not present on any of the FlowFiles",
+ new Object[] {flowFiles.size(), countAttributeName});
+ fail();
+ return;
+ }
+ }
+
public void complete(final String completionReason) throws IOException {
writeLock.lock();
try {
@@ -321,48 +366,16 @@ public class RecordBin {
return;
}
- // If using defragment mode, and we don't have enough FlowFiles,
then we need to fail this bin.
final Optional<String> countAttr =
thresholds.getFragmentCountAttribute();
if (countAttr.isPresent()) {
- // Ensure that at least one FlowFile has a fragment.count
attribute and that they all have the same value, if they have a value.
- Integer expectedBinCount = null;
- for (final FlowFile flowFile : flowFiles) {
- final String countVal =
flowFile.getAttribute(countAttr.get());
- if (countVal == null) {
- continue;
- }
-
- final int count;
- try {
- count = Integer.parseInt(countVal);
- } catch (final NumberFormatException nfe) {
- logger.error("Could not merge bin with {} FlowFiles
because the '{}' attribute had a value of '{}' for {} but expected a number",
- new Object[] {flowFiles.size(),
countAttr.get(), countVal, flowFile});
- fail();
- return;
- }
-
- if (expectedBinCount != null && count != expectedBinCount)
{
- logger.error("Could not merge bin with {} FlowFiles
because the '{}' attribute had a value of '{}' for {} but another FlowFile in
the bin had a value of {}",
- new Object[] {flowFiles.size(),
countAttr.get(), countVal, flowFile, expectedBinCount});
- fail();
- return;
- }
-
- expectedBinCount = count;
- }
-
- if (expectedBinCount == null) {
- logger.error("Could not merge bin with {} FlowFiles
because the '{}' attribute was not present on any of the FlowFiles",
- new Object[] {flowFiles.size(), countAttr.get()});
- fail();
- return;
- }
+ validateFragmentCount(countAttr.get());
- if (expectedBinCount != flowFiles.size()) {
+ // If using defragment mode, and we don't have enough
FlowFiles, then we need to fail this bin.
+ Integer expectedFragmentCount = thresholds.getFragmentCount();
+ if (expectedFragmentCount != flowFiles.size()) {
logger.error("Could not merge bin with {} FlowFiles
because the '{}' attribute had a value of '{}' but only {} of {} FlowFiles were
encountered before this bin was evicted "
+ "(due to to Max Bin Age being reached or
due to the Maximum Number of Bins being exceeded).",
- new Object[] {flowFiles.size(), countAttr.get(),
expectedBinCount, flowFiles.size(), expectedBinCount});
+ new Object[] {flowFiles.size(), countAttr.get(),
expectedFragmentCount, flowFiles.size(), expectedFragmentCount});
fail();
return;
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index 8b0b84e..a84e724 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -27,7 +27,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.standard.MergeContent;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -197,9 +196,8 @@ public class RecordBinManager {
final String mergeStrategy =
context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
if
(MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
fragmentCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
- if
(!StringUtils.isEmpty(flowfile.getAttribute(fragmentCountAttribute))) {
- minRecords =
Integer.parseInt(flowfile.getAttribute(fragmentCountAttribute));
- }
+ // We don't know minRecords in defragment mode.
+ minRecords = Integer.MAX_VALUE;
} else {
fragmentCountAttribute = null;
}
@@ -240,15 +238,15 @@ public class RecordBinManager {
public int completeExpiredBins() throws IOException {
final long maxNanos = maxBinAgeNanos.get();
- return handleCompletedBins(bin -> bin.isOlderThan(maxNanos,
TimeUnit.NANOSECONDS));
+ return handleCompletedBins(bin -> bin.isOlderThan(maxNanos,
TimeUnit.NANOSECONDS), "Bin has reached Max Bin Age");
}
public int completeFullEnoughBins() throws IOException {
- return handleCompletedBins(RecordBin::isFullEnough);
+ return handleCompletedBins(RecordBin::isFullEnough, "Bin is full
enough");
}
- private int handleCompletedBins(final Predicate<RecordBin> completionTest)
throws IOException {
- final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>();
+ private int handleCompletedBins(final Predicate<RecordBin> completionTest,
final String completionReason) throws IOException {
+ final Map<String, List<RecordBin>> completedBinMap = new HashMap<>();
lock.lock();
try {
@@ -258,7 +256,7 @@ public class RecordBinManager {
for (final RecordBin bin : bins) {
if (completionTest.test(bin)) {
- final List<RecordBin> expiredBinsForKey =
expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
+ final List<RecordBin> expiredBinsForKey =
completedBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
expiredBinsForKey.add(bin);
}
}
@@ -268,17 +266,17 @@ public class RecordBinManager {
}
int completed = 0;
- for (final Map.Entry<String, List<RecordBin>> entry :
expiredBinMap.entrySet()) {
+ for (final Map.Entry<String, List<RecordBin>> entry :
completedBinMap.entrySet()) {
final String key = entry.getKey();
- final List<RecordBin> expiredBins = entry.getValue();
+ final List<RecordBin> completeBins = entry.getValue();
- for (final RecordBin bin : expiredBins) {
- logger.debug("Completing Bin {} because it has expired");
- bin.complete("Bin has reached Max Bin Age");
+ for (final RecordBin bin : completeBins) {
+ logger.debug("Completing Bin {} because {}", new Object[]{bin,
completionReason});
+ bin.complete(completionReason);
completed++;
}
- removeBins(key, expiredBins);
+ removeBins(key, completeBins);
}
return completed;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
index 8f1a5c8..7cce4b9 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
@@ -27,6 +27,7 @@ public class RecordBinThresholds {
private final long maxBinMillis;
private final String maxBinAge;
private final Optional<String> fragmentCountAttribute;
+ private Integer fragmentCount;
public RecordBinThresholds(final int minRecords, final int maxRecords,
final long minBytes, final long maxBytes, final long maxBinMillis,
final String maxBinAge, final String fragmentCountAttribute) {
@@ -66,4 +67,12 @@ public class RecordBinThresholds {
public Optional<String> getFragmentCountAttribute() {
return fragmentCountAttribute;
}
+
+ public Integer getFragmentCount() {
+ return fragmentCount;
+ }
+
+ public void setFragmentCount(Integer fragmentCount) {
+ this.fragmentCount = fragmentCount;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 6123970..f6ca6a1 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -285,6 +285,46 @@ public class TestMergeRecord {
}
+ @Test
+ public void testDefragmentWithMultipleRecordsOverMultipleCalls() {
+ runner.setProperty(MergeRecord.MERGE_STRATEGY,
MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+ final Map<String, String> attr1 = new HashMap<>();
+ attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+ attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+ attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+ attr1.put("record.count", "2");
+
+ final Map<String, String> attr2 = new HashMap<>();
+ attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+ attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+ attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
+ attr2.put("record.count", "2");
+
+ runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1);
+
+ runner.run(2);
+
+ assertEquals("Fragment id=1 should remain in the incoming connection",
1, runner.getQueueSize().getObjectCount());
+ runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+ runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+ runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2);
+
+ runner.run(1);
+
+ assertEquals("Fragment id=1 should be merged", 0,
runner.getQueueSize().getObjectCount());
+ runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+ runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+
+ final List<MockFlowFile> mffs =
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
+ assertEquals(1L, mffs.stream()
+ .filter(ff -> "4".equals(ff.getAttribute("record.count")))
+ .filter(ff ->
"header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new
String(ff.toByteArray())))
+ .count());
+
+ }
+
@Test
public void testMinSize() {