This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 86cae18  NIFI-6599 Fix MergeRecord failure in defragment mode
86cae18 is described below

commit 86cae184ff1cfe8e81f7e1d2d393c39c17bca02a
Author: Koji Kawamura <[email protected]>
AuthorDate: Wed Aug 28 13:02:32 2019 +0900

    NIFI-6599 Fix MergeRecord failure in defragment mode
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3678.
---
 .../nifi/processors/standard/MergeRecord.java      |  5 +-
 .../nifi/processors/standard/merge/RecordBin.java  | 95 ++++++++++++----------
 .../standard/merge/RecordBinManager.java           | 28 +++----
 .../standard/merge/RecordBinThresholds.java        |  9 ++
 .../nifi/processors/standard/TestMergeRecord.java  | 40 +++++++++
 5 files changed, 119 insertions(+), 58 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 797359e..0c40aa6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -353,9 +353,10 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
             session.commit();
         }
 
-        // If there is no more data queued up, complete any bin that meets our 
minimum threshold
+        // If there is no more data queued up, or strategy is defragment, 
complete any bin that meets our minimum threshold
+        // Otherwise, run one more cycle to process queued FlowFiles to add 
more fragment into available bins.
         int completedBins = 0;
-        if (flowFiles.isEmpty()) {
+        if (flowFiles.isEmpty() || 
MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
             try {
                 completedBins += manager.completeFullEnoughBins();
             } catch (final Exception e) {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index e6ec2e6..c3dbbaa 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -148,6 +148,8 @@ public class RecordBin {
             flowFileMigrated = true;
             this.flowFiles.add(flowFile);
 
+            
thresholds.getFragmentCountAttribute().ifPresent(this::validateFragmentCount);
+
             if (recordCount >= getMinimumRecordCount()) {
                 // If we have met our minimum record count, we need to flush 
so that when we reach the desired number of bytes
                 // the bin is considered 'full enough'.
@@ -203,10 +205,6 @@ public class RecordBin {
                 return false;
             }
 
-            if(thresholds.getFragmentCountAttribute().isPresent() && 
this.fragmentCount == getMinimumRecordCount()) {
-                return true;
-            }
-
             int maxRecords = thresholds.getMaxRecords();
 
             if (recordCount >= maxRecords) {
@@ -242,6 +240,11 @@ public class RecordBin {
                 return false;
             }
 
+            if (thresholds.getFragmentCountAttribute().isPresent()) {
+                // Compare with the target fragment count.
+                return this.fragmentCount == thresholds.getFragmentCount();
+            }
+
             final int requiredRecordCount = getMinimumRecordCount();
             return (recordCount >= requiredRecordCount && 
out.getBytesWritten() >= thresholds.getMinBytes());
         } finally {
@@ -301,6 +304,48 @@ public class RecordBin {
         }
     }
 
+    /**
+     * Ensure that at least one FlowFile has a fragment.count attribute and 
that they all have the same value, if they have a value.
+     */
+    private void validateFragmentCount(String countAttributeName) {
+        Integer expectedFragmentCount = thresholds.getFragmentCount();
+        for (final FlowFile flowFile : flowFiles) {
+            final String countVal = flowFile.getAttribute(countAttributeName);
+            if (countVal == null) {
+                continue;
+            }
+
+            final int count;
+            try {
+                count = Integer.parseInt(countVal);
+            } catch (final NumberFormatException nfe) {
+                logger.error("Could not merge bin with {} FlowFiles because 
the '{}' attribute had a value of '{}' for {} but expected a number",
+                    new Object[] {flowFiles.size(), countAttributeName, 
countVal, flowFile});
+                fail();
+                return;
+            }
+
+            if (expectedFragmentCount != null && count != 
expectedFragmentCount) {
+                logger.error("Could not merge bin with {} FlowFiles because 
the '{}' attribute had a value of '{}' for {} but another FlowFile in the bin 
had a value of {}",
+                    new Object[] {flowFiles.size(), countAttributeName, 
countVal, flowFile, expectedFragmentCount});
+                fail();
+                return;
+            }
+
+            if (expectedFragmentCount == null) {
+                expectedFragmentCount = count;
+                thresholds.setFragmentCount(count);
+            }
+        }
+
+        if (expectedFragmentCount == null) {
+            logger.error("Could not merge bin with {} FlowFiles because the 
'{}' attribute was not present on any of the FlowFiles",
+                new Object[] {flowFiles.size(), countAttributeName});
+            fail();
+            return;
+        }
+    }
+
     public void complete(final String completionReason) throws IOException {
         writeLock.lock();
         try {
@@ -321,48 +366,16 @@ public class RecordBin {
                 return;
             }
 
-            // If using defragment mode, and we don't have enough FlowFiles, 
then we need to fail this bin.
             final Optional<String> countAttr = 
thresholds.getFragmentCountAttribute();
             if (countAttr.isPresent()) {
-                // Ensure that at least one FlowFile has a fragment.count 
attribute and that they all have the same value, if they have a value.
-                Integer expectedBinCount = null;
-                for (final FlowFile flowFile : flowFiles) {
-                    final String countVal = 
flowFile.getAttribute(countAttr.get());
-                    if (countVal == null) {
-                        continue;
-                    }
-
-                    final int count;
-                    try {
-                        count = Integer.parseInt(countVal);
-                    } catch (final NumberFormatException nfe) {
-                        logger.error("Could not merge bin with {} FlowFiles 
because the '{}' attribute had a value of '{}' for {} but expected a number",
-                                new Object[] {flowFiles.size(), 
countAttr.get(), countVal, flowFile});
-                        fail();
-                        return;
-                    }
-
-                    if (expectedBinCount != null && count != expectedBinCount) 
{
-                        logger.error("Could not merge bin with {} FlowFiles 
because the '{}' attribute had a value of '{}' for {} but another FlowFile in 
the bin had a value of {}",
-                                new Object[] {flowFiles.size(), 
countAttr.get(), countVal, flowFile, expectedBinCount});
-                        fail();
-                        return;
-                    }
-
-                    expectedBinCount = count;
-                }
-
-                if (expectedBinCount == null) {
-                    logger.error("Could not merge bin with {} FlowFiles 
because the '{}' attribute was not present on any of the FlowFiles",
-                            new Object[] {flowFiles.size(), countAttr.get()});
-                    fail();
-                    return;
-                }
+                validateFragmentCount(countAttr.get());
 
-                if (expectedBinCount != flowFiles.size()) {
+                // If using defragment mode, and we don't have enough 
FlowFiles, then we need to fail this bin.
+                Integer expectedFragmentCount = thresholds.getFragmentCount();
+                if (expectedFragmentCount != flowFiles.size()) {
                     logger.error("Could not merge bin with {} FlowFiles 
because the '{}' attribute had a value of '{}' but only {} of {} FlowFiles were 
encountered before this bin was evicted "
                                     + "(due to to Max Bin Age being reached or 
due to the Maximum Number of Bins being exceeded).",
-                            new Object[] {flowFiles.size(), countAttr.get(), 
expectedBinCount, flowFiles.size(), expectedBinCount});
+                            new Object[] {flowFiles.size(), countAttr.get(), 
expectedFragmentCount, flowFiles.size(), expectedFragmentCount});
                     fail();
                     return;
                 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index 8b0b84e..a84e724 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -27,7 +27,6 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processors.standard.MergeContent;
 import org.apache.nifi.processors.standard.MergeRecord;
 import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -197,9 +196,8 @@ public class RecordBinManager {
         final String mergeStrategy = 
context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
         if 
(MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
             fragmentCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
-            if 
(!StringUtils.isEmpty(flowfile.getAttribute(fragmentCountAttribute))) {
-                minRecords = 
Integer.parseInt(flowfile.getAttribute(fragmentCountAttribute));
-            }
+            // We don't know minRecords in defragment mode.
+            minRecords = Integer.MAX_VALUE;
         } else {
             fragmentCountAttribute = null;
         }
@@ -240,15 +238,15 @@ public class RecordBinManager {
 
     public int completeExpiredBins() throws IOException {
         final long maxNanos = maxBinAgeNanos.get();
-        return handleCompletedBins(bin -> bin.isOlderThan(maxNanos, 
TimeUnit.NANOSECONDS));
+        return handleCompletedBins(bin -> bin.isOlderThan(maxNanos, 
TimeUnit.NANOSECONDS), "Bin has reached Max Bin Age");
     }
 
     public int completeFullEnoughBins() throws IOException {
-        return handleCompletedBins(RecordBin::isFullEnough);
+        return handleCompletedBins(RecordBin::isFullEnough, "Bin is full 
enough");
     }
 
-    private int handleCompletedBins(final Predicate<RecordBin> completionTest) 
throws IOException {
-        final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>();
+    private int handleCompletedBins(final Predicate<RecordBin> completionTest, 
final String completionReason) throws IOException {
+        final Map<String, List<RecordBin>> completedBinMap = new HashMap<>();
 
         lock.lock();
         try {
@@ -258,7 +256,7 @@ public class RecordBinManager {
 
                 for (final RecordBin bin : bins) {
                     if (completionTest.test(bin)) {
-                        final List<RecordBin> expiredBinsForKey = 
expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
+                        final List<RecordBin> expiredBinsForKey = 
completedBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
                         expiredBinsForKey.add(bin);
                     }
                 }
@@ -268,17 +266,17 @@ public class RecordBinManager {
         }
 
         int completed = 0;
-        for (final Map.Entry<String, List<RecordBin>> entry : 
expiredBinMap.entrySet()) {
+        for (final Map.Entry<String, List<RecordBin>> entry : 
completedBinMap.entrySet()) {
             final String key = entry.getKey();
-            final List<RecordBin> expiredBins = entry.getValue();
+            final List<RecordBin> completeBins = entry.getValue();
 
-            for (final RecordBin bin : expiredBins) {
-                logger.debug("Completing Bin {} because it has expired");
-                bin.complete("Bin has reached Max Bin Age");
+            for (final RecordBin bin : completeBins) {
+                logger.debug("Completing Bin {} because {}", new Object[]{bin, 
completionReason});
+                bin.complete(completionReason);
                 completed++;
             }
 
-            removeBins(key, expiredBins);
+            removeBins(key, completeBins);
         }
 
         return completed;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
index 8f1a5c8..7cce4b9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
@@ -27,6 +27,7 @@ public class RecordBinThresholds {
     private final long maxBinMillis;
     private final String maxBinAge;
     private final Optional<String> fragmentCountAttribute;
+    private Integer fragmentCount;
 
     public RecordBinThresholds(final int minRecords, final int maxRecords, 
final long minBytes, final long maxBytes, final long maxBinMillis,
         final String maxBinAge, final String fragmentCountAttribute) {
@@ -66,4 +67,12 @@ public class RecordBinThresholds {
     public Optional<String> getFragmentCountAttribute() {
         return fragmentCountAttribute;
     }
+
+    public Integer getFragmentCount() {
+        return fragmentCount;
+    }
+
+    public void setFragmentCount(Integer fragmentCount) {
+        this.fragmentCount = fragmentCount;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 6123970..f6ca6a1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -285,6 +285,46 @@ public class TestMergeRecord {
 
     }
 
+    @Test
+    public void testDefragmentWithMultipleRecordsOverMultipleCalls() {
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, 
MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+        final Map<String, String> attr1 = new HashMap<>();
+        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+        attr1.put("record.count", "2");
+
+        final Map<String, String> attr2 = new HashMap<>();
+        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
+        attr2.put("record.count", "2");
+
+        runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1);
+
+        runner.run(2);
+
+        assertEquals("Fragment id=1 should remain in the incoming connection", 
1, runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
+
+        runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2);
+
+        runner.run(1);
+
+        assertEquals("Fragment id=1 should be merged", 0, 
runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+
+        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "4".equals(ff.getAttribute("record.count")))
+            .filter(ff -> 
"header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new 
String(ff.toByteArray())))
+            .count());
+
+    }
+
 
     @Test
     public void testMinSize() {

Reply via email to