This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8174854347 NIFI-14942 [ValidateCSV] Add 'Max Lines Per Row' property.
8174854347 is described below

commit 81748543475f9c080be81e52ea4df3d9186faa4f
Author: Filip Maretic <[email protected]>
AuthorDate: Fri Sep 5 16:59:55 2025 +0200

    NIFI-14942 [ValidateCSV] Add 'Max Lines Per Row' property.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10279.
---
 .../nifi/processors/standard/ValidateCsv.java      | 40 +++++++++++++--
 .../nifi/processors/standard/TestValidateCsv.java  | 57 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
index aa9bba5611..ac063b99c7 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -142,6 +142,20 @@ public class ValidateCsv extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor MAX_LINES_PER_ROW = new 
PropertyDescriptor.Builder()
+            .name("Max Lines Per Row")
+            .description("""
+                    The maximum number of lines that a row can span before an 
exception is thrown. This option allows
+                    the processor to fail fast when encountering CSV with 
mismatching quotes - the normal behaviour
+                    would be to continue reading until the matching quote is 
found, which could potentially mean reading
+                    the whole file (and exhausting all available memory). Zero 
value will disable this option.
+                    """)
+            .required(true)
+            .defaultValue("0")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
     public static final PropertyDescriptor DELIMITER_CHARACTER = new 
PropertyDescriptor.Builder()
             .name("validate-csv-delimiter")
             .displayName("Delimiter character")
@@ -200,6 +214,7 @@ public class ValidateCsv extends AbstractProcessor {
             HEADER,
             DELIMITER_CHARACTER,
             QUOTE_CHARACTER,
+            MAX_LINES_PER_ROW,
             END_OF_LINE_CHARACTER,
             VALIDATION_STRATEGY,
             INCLUDE_ALL_VIOLATIONS
@@ -265,9 +280,28 @@ public class ValidateCsv extends AbstractProcessor {
         // input is transferred over to Java as is. So when you type the 
characters "\"
         // and "n" into the UI the Java string will end up being those two 
characters
         // not the interpreted value "\n".
-        final String msgDemarcator = 
context.getProperty(END_OF_LINE_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().replace("\\n",
 "\n").replace("\\r", "\r").replace("\\t", "\t");
-        return new 
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().charAt(0),
-                
context.getProperty(DELIMITER_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().charAt(0),
 msgDemarcator).build();
+        final String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER)
+                .evaluateAttributeExpressions(flowFile)
+                .getValue()
+                .replace("\\n", "\n")
+                .replace("\\r", "\r")
+                .replace("\\t", "\t");
+
+        final char quoteChar = context.getProperty(QUOTE_CHARACTER)
+                .evaluateAttributeExpressions(flowFile)
+                .getValue()
+                .charAt(0);
+
+        final int delimiterChar = context.getProperty(DELIMITER_CHARACTER)
+                .evaluateAttributeExpressions(flowFile)
+                .getValue()
+                .charAt(0);
+
+        final int maxLinesPerRow = 
context.getProperty(MAX_LINES_PER_ROW).asInteger();
+
+        return new CsvPreference.Builder(quoteChar, delimiterChar, 
msgDemarcator)
+                .maxLinesPerRow(maxLinesPerRow)
+                .build();
     }
 
     /**
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
index 43c92263d0..ab22f2a79e 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -28,6 +28,63 @@ public class TestValidateCsv {
 
     private final TestRunner runner = TestRunners.newTestRunner(new 
ValidateCsv());
 
+    @Test
+    public void testNonTerminatedQuoteCharacterForLineByLineValidation() {
+        // This test covers the scenario where a quote character is opened but 
not closed before the end of the file.
+        // In such a case, there is a risk of loading the entire file into 
memory, which can lead to OOM errors.
+        // This test is focused on line-by-line validation, where each line is 
treated independently.
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.MAX_LINES_PER_ROW, "1");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+        runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(), 
StrNotNullOrEmpty()");
+        runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, 
ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
+
+        runner.enqueue("\"1,foo\n2,bar\n3,baz\n");
+        runner.run();
+
+        runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+
+        MockFlowFile validFF = 
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst();
+        validFF.assertAttributeEquals("count.valid.lines", "2");
+        validFF.assertAttributeEquals("count.total.lines", "3");
+        validFF.assertContentEquals("2,bar\n3,baz");
+
+        MockFlowFile invalidFF = 
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).getFirst();
+        invalidFF.assertAttributeEquals("count.invalid.lines", "1");
+        invalidFF.assertAttributeEquals("count.total.lines", "3");
+        invalidFF.assertAttributeEquals("validation.error.message", 
"unexpected end of line while reading quoted column on line 1");
+        invalidFF.assertContentEquals("\"1,foo\n");
+    }
+
+    @Test
+    public void testNonTerminatedQuoteCharacterForWholeFileValidation() {
+        // This test covers the scenario where a quote character is opened but 
not closed before the end of the file.
+        // In such a case, there is a risk of loading the entire file into 
memory, which can lead to OOM errors.
+        // This test is focused on whole file validation.
+        runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+        runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
+        runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+        runner.setProperty(ValidateCsv.MAX_LINES_PER_ROW, "1");
+        runner.setProperty(ValidateCsv.HEADER, "false");
+        runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(), 
StrNotNullOrEmpty()");
+        runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, 
ValidateCsv.VALIDATE_WHOLE_FLOWFILE);
+
+        runner.enqueue("\"1,foo\n2,bar\n3,baz\n");
+        runner.run();
+
+        runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
+        runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+
+        MockFlowFile invalidFF = 
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).getFirst();
+        invalidFF.assertAttributeNotExists("count.invalid.lines");
+        invalidFF.assertAttributeNotExists("count.total.lines");
+        invalidFF.assertAttributeEquals("validation.error.message", 
"unexpected end of line while reading quoted column on line 1");
+        invalidFF.assertContentEquals("\"1,foo\n2,bar\n3,baz\n");
+    }
+
     @Test
     public void testHeaderAndSplit() {
         runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");

Reply via email to