This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 1d22e8a NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and MAX_RECORDS 1d22e8a is described below commit 1d22e8a86d001540bb86d017a9139393778628cb Author: Alessandro D'Armiento <alessandro.darmie...@agilelab.it> AuthorDate: Fri Jul 26 17:14:11 2019 +0200 NIFI-6490 MergeRecord supports Variable Registry for MIN_RECORDS and MAX_RECORDS Unified unit tests Added custom validation cases for MIN_RECORDS and MAX_RECORDS enforcing they are greater than zero. While MIN_RECORDS > 0 can fail individually, MAX_RECORDS > 0 validator cannot fail without having also at least another validation step (either the MIN_RECORDS > 0 or the MAX_RECORDS > MIN_RECORDS) to fail, since MIN_RECORDS is a required property with default value 1 This closes #3607. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../nifi/processors/standard/MergeRecord.java | 22 ++++- .../standard/merge/RecordBinManager.java | 4 +- .../nifi/processors/standard/TestMergeRecord.java | 100 +++++++++++++++++++++ 3 files changed, 122 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java index 130d6b6..797359e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java @@ -179,6 +179,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { .required(true) .defaultValue("1") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder() .name("max-records") @@ -188,6 +189,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { .required(false) .defaultValue("1000") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder() .name("max.bin.count") @@ -268,8 +270,8 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { final List<ValidationResult> results = new ArrayList<>(); - final Integer minRecords = validationContext.getProperty(MIN_RECORDS).asInteger(); - final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).asInteger(); + final Integer minRecords = validationContext.getProperty(MIN_RECORDS).evaluateAttributeExpressions().asInteger(); + final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).evaluateAttributeExpressions().asInteger(); if (minRecords != null && maxRecords != null && maxRecords < minRecords) { results.add(new ValidationResult.Builder() .subject("Max Records") @@ -278,6 +280,22 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { .explanation("<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property") .build()); } + if (minRecords != null && minRecords <= 0) { + results.add(new ValidationResult.Builder() + .subject("Min Records") + .input(String.valueOf(minRecords)) + .valid(false) + .explanation("<Minimum Number of Records> property cannot be negative or zero") + .build()); + } + if (maxRecords != null && maxRecords <= 0) { + results.add(new ValidationResult.Builder() + .subject("Max Records") + .input(String.valueOf(maxRecords)) + .valid(false) + .explanation("<Maximum Number of Records> property cannot be negative or zero") + .build()); + } final Double minSize = validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B); final Double maxSize = validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java index c271c2c..8b0b84e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java @@ -182,8 +182,8 @@ public class RecordBinManager { private RecordBinThresholds createThresholds(FlowFile flowfile) { - int minRecords = context.getProperty(MergeRecord.MIN_RECORDS).asInteger(); - final int maxRecords = context.getProperty(MergeRecord.MAX_RECORDS).asInteger(); + int minRecords = context.getProperty(MergeRecord.MIN_RECORDS).evaluateAttributeExpressions().asInteger(); + final int maxRecords = context.getProperty(MergeRecord.MAX_RECORDS).evaluateAttributeExpressions().asInteger(); final long minBytes = context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue(); final PropertyValue maxSizeValue = context.getProperty(MergeRecord.MAX_SIZE); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java index 2f235df..6123970 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java @@ -486,4 +486,104 @@ public class TestMergeRecord { runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0); runner.assertTransferCount(MergeRecord.REL_FAILURE, 1); } + + @Test + public void testMergeWithMinRecordsFromVariableRegistry() { + runner.setVariable("min_records", "3"); + runner.setVariable("max_records", "3"); + runner.setValidateExpressionUsage(true); + + // Test MIN_RECORDS + runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}"); + runner.setProperty(MergeRecord.MAX_RECORDS, "3"); + + runner.enqueue("Name, Age\nJohn, 35"); + runner.enqueue("Name, Age\nJane, 34"); + runner.enqueue("Name, Age\nAlex, 28"); + + runner.run(1); + runner.assertTransferCount(MergeRecord.REL_MERGED, 1); + runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0); + mff.assertAttributeEquals("record.count", "3"); + mff.assertContentEquals("header\nJohn,35\nJane,34\nAlex,28\n"); + runner.clearTransferState(); + + // Test MAX_RECORDS + runner.setProperty(MergeRecord.MIN_RECORDS, "1"); + runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}"); + + runner.enqueue("Name, Age\nJohn, 35"); + runner.enqueue("Name, Age\nJane, 34"); + runner.enqueue("Name, Age\nAlex, 28"); + runner.enqueue("Name, Age\nDonna, 48"); + runner.enqueue("Name, Age\nJoey, 45"); + + runner.run(2); + runner.assertTransferCount(MergeRecord.REL_MERGED, 2); + runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 5); + + final MockFlowFile mff1 = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0); + mff1.assertAttributeEquals("record.count", "3"); + mff1.assertContentEquals("header\nJohn,35\nJane,34\nAlex,28\n"); + + final MockFlowFile mff2 = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(1); + mff2.assertAttributeEquals("record.count", "2"); + mff2.assertContentEquals("header\nDonna,48\nJoey,45\n"); + runner.clearTransferState(); + + runner.removeProperty("min_records"); + runner.removeProperty("max_records"); + } + + @Test + public void testNegativeMinAndMaxRecordsValidators(){ + + runner.setVariable("min_records", "-3"); + runner.setVariable("max_records", "-1"); + + // This configuration breaks the "<Minimum Number of Records> property cannot be negative or zero" rule + runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}"); + runner.setProperty(MergeRecord.MAX_RECORDS, "3"); + runner.assertNotValid(); + + // This configuration breaks the "<Minimum Number of Records> property cannot be negative or zero" and the + // "<Maximum Number of Records> property cannot be negative or zero" rules + runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}"); + runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}"); + runner.assertNotValid(); + + // This configuration breaks the "<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property" + // and the "<Maximum Number of Records> property cannot be negative or zero" rules + runner.setProperty(MergeRecord.MIN_RECORDS, "3"); + runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}"); + runner.assertNotValid(); + + // This configuration breaks the "<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property" + // and the "<Maximum Number of Records> property cannot be negative or zero" rules + runner.removeProperty(MergeRecord.MIN_RECORDS); // Will use the default value of 1 + runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}"); + runner.assertNotValid(); + + // This configuration breaks the "<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property", + // the "<Minimum Number of Records> property cannot be negative or zero" and the "<Maximum Number of Records> + // property cannot be negative or zero" rules + runner.setVariable("min_records", "-1"); + runner.setVariable("max_records", "-3"); + runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}"); + runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}"); + runner.assertNotValid(); + + // This configuration is valid + runner.setVariable("min_records", "1"); + runner.setVariable("max_records", "5"); + runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}"); + runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}"); + runner.assertValid(); + + runner.removeProperty("min_records"); + runner.removeProperty("max_records"); + } + }