This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d22e8a  NIFI-6490 MergeRecord supports Variable Registry for 
MIN_RECORDS and MAX_RECORDS
1d22e8a is described below

commit 1d22e8a86d001540bb86d017a9139393778628cb
Author: Alessandro D'Armiento <alessandro.darmie...@agilelab.it>
AuthorDate: Fri Jul 26 17:14:11 2019 +0200

    NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and 
MAX_RECORDS
    
    Unified unit tests
    
    Added custom validation cases for MIN_RECORDS and MAX_RECORDS enforcing 
they are greater than zero.
    While MIN_RECORDS > 0 can fail individually, MAX_RECORDS > 0 validator 
cannot fail without having also at least another validation step (either the 
MIN_RECORDS > 0 or the MAX_RECORDS > MIN_RECORDS) to fail, since MIN_RECORDS is 
a required property with default value 1
    
    This closes #3607.
    
    Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>
---
 .../nifi/processors/standard/MergeRecord.java      |  22 ++++-
 .../standard/merge/RecordBinManager.java           |   4 +-
 .../nifi/processors/standard/TestMergeRecord.java  | 100 +++++++++++++++++++++
 3 files changed, 122 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 130d6b6..797359e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -179,6 +179,7 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
         .required(true)
         .defaultValue("1")
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
     public static final PropertyDescriptor MAX_RECORDS = new 
PropertyDescriptor.Builder()
         .name("max-records")
@@ -188,6 +189,7 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
         .required(false)
         .defaultValue("1000")
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
     public static final PropertyDescriptor MAX_BIN_COUNT = new 
PropertyDescriptor.Builder()
         .name("max.bin.count")
@@ -268,8 +270,8 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
     protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>();
 
-        final Integer minRecords = 
validationContext.getProperty(MIN_RECORDS).asInteger();
-        final Integer maxRecords = 
validationContext.getProperty(MAX_RECORDS).asInteger();
+        final Integer minRecords = 
validationContext.getProperty(MIN_RECORDS).evaluateAttributeExpressions().asInteger();
+        final Integer maxRecords = 
validationContext.getProperty(MAX_RECORDS).evaluateAttributeExpressions().asInteger();
         if (minRecords != null && maxRecords != null && maxRecords < 
minRecords) {
             results.add(new ValidationResult.Builder()
                 .subject("Max Records")
@@ -278,6 +280,22 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
                 .explanation("<Maximum Number of Records> property cannot be 
smaller than <Minimum Number of Records> property")
                 .build());
         }
+        if (minRecords != null && minRecords <= 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Min Records")
+                    .input(String.valueOf(minRecords))
+                    .valid(false)
+                    .explanation("<Minimum Number of Records> property cannot 
be negative or zero")
+                    .build());
+        }
+        if (maxRecords != null && maxRecords <= 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Max Records")
+                    .input(String.valueOf(maxRecords))
+                    .valid(false)
+                    .explanation("<Maximum Number of Records> property cannot 
be negative or zero")
+                    .build());
+        }
 
         final Double minSize = 
validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B);
         final Double maxSize = 
validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index c271c2c..8b0b84e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -182,8 +182,8 @@ public class RecordBinManager {
 
 
     private RecordBinThresholds createThresholds(FlowFile flowfile) {
-        int minRecords = 
context.getProperty(MergeRecord.MIN_RECORDS).asInteger();
-        final int maxRecords = 
context.getProperty(MergeRecord.MAX_RECORDS).asInteger();
+        int minRecords = 
context.getProperty(MergeRecord.MIN_RECORDS).evaluateAttributeExpressions().asInteger();
+        final int maxRecords = 
context.getProperty(MergeRecord.MAX_RECORDS).evaluateAttributeExpressions().asInteger();
         final long minBytes = 
context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue();
 
         final PropertyValue maxSizeValue = 
context.getProperty(MergeRecord.MAX_SIZE);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 2f235df..6123970 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -486,4 +486,104 @@ public class TestMergeRecord {
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
         runner.assertTransferCount(MergeRecord.REL_FAILURE, 1);
     }
+
+    @Test
+    public void testMergeWithMinRecordsFromVariableRegistry() {
+        runner.setVariable("min_records", "3");
+        runner.setVariable("max_records", "3");
+        runner.setValidateExpressionUsage(true);
+
+        // Test MIN_RECORDS
+        runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "3");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+        runner.enqueue("Name, Age\nAlex, 28");
+
+        runner.run(1);
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
+        mff.assertAttributeEquals("record.count", "3");
+        mff.assertContentEquals("header\nJohn,35\nJane,34\nAlex,28\n");
+        runner.clearTransferState();
+
+        // Test MAX_RECORDS
+        runner.setProperty(MergeRecord.MIN_RECORDS, "1");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+
+        runner.enqueue("Name, Age\nJohn, 35");
+        runner.enqueue("Name, Age\nJane, 34");
+        runner.enqueue("Name, Age\nAlex, 28");
+        runner.enqueue("Name, Age\nDonna, 48");
+        runner.enqueue("Name, Age\nJoey, 45");
+
+        runner.run(2);
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 5);
+
+        final MockFlowFile mff1 = 
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
+        mff1.assertAttributeEquals("record.count", "3");
+        mff1.assertContentEquals("header\nJohn,35\nJane,34\nAlex,28\n");
+
+        final MockFlowFile mff2 = 
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(1);
+        mff2.assertAttributeEquals("record.count", "2");
+        mff2.assertContentEquals("header\nDonna,48\nJoey,45\n");
+        runner.clearTransferState();
+
+        runner.removeProperty("min_records");
+        runner.removeProperty("max_records");
+    }
+
+    @Test
+    public void testNegativeMinAndMaxRecordsValidators(){
+
+        runner.setVariable("min_records", "-3");
+        runner.setVariable("max_records", "-1");
+
+        // This configuration breaks the "<Minimum Number of Records> property 
cannot be negative or zero" rule
+        runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "3");
+        runner.assertNotValid();
+
+        // This configuration breaks the "<Minimum Number of Records> property 
cannot be negative or zero" and the
+        // "<Maximum Number of Records> property cannot be negative or zero" 
rules
+        runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+        runner.assertNotValid();
+
+        // This configuration breaks the "<Maximum Number of Records> property 
cannot be smaller than <Minimum Number of Records> property"
+        // and the "<Maximum Number of Records> property cannot be negative or 
zero" rules
+        runner.setProperty(MergeRecord.MIN_RECORDS, "3");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+        runner.assertNotValid();
+
+        // This configuration breaks the "<Maximum Number of Records> property 
cannot be smaller than <Minimum Number of Records> property"
+        // and the "<Maximum Number of Records> property cannot be negative or 
zero" rules
+        runner.removeProperty(MergeRecord.MIN_RECORDS); // Will use the 
default value of 1
+        runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+        runner.assertNotValid();
+
+        // This configuration breaks the "<Maximum Number of Records> property 
cannot be smaller than <Minimum Number of Records> property",
+        // the "<Minimum Number of Records> property cannot be negative or 
zero" and the "<Maximum Number of Records>
+        // property cannot be negative or zero" rules
+        runner.setVariable("min_records", "-1");
+        runner.setVariable("max_records", "-3");
+        runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+        runner.assertNotValid();
+
+        // This configuration is valid
+        runner.setVariable("min_records", "1");
+        runner.setVariable("max_records", "5");
+        runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
+        runner.assertValid();
+
+        runner.removeProperty("min_records");
+        runner.removeProperty("max_records");
+    }
+
 }

Reply via email to