This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8174854347 NIFI-14942 [ValidateCSV] Add 'Max Lines Per Row' property.
8174854347 is described below
commit 81748543475f9c080be81e52ea4df3d9186faa4f
Author: Filip Maretic <[email protected]>
AuthorDate: Fri Sep 5 16:59:55 2025 +0200
NIFI-14942 [ValidateCSV] Add 'Max Lines Per Row' property.
Signed-off-by: Pierre Villard <[email protected]>
This closes #10279.
---
.../nifi/processors/standard/ValidateCsv.java | 40 +++++++++++++--
.../nifi/processors/standard/TestValidateCsv.java | 57 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 3 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
index aa9bba5611..ac063b99c7 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -142,6 +142,20 @@ public class ValidateCsv extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor MAX_LINES_PER_ROW = new
PropertyDescriptor.Builder()
+ .name("Max Lines Per Row")
+ .description("""
+ The maximum number of lines that a row can span before an
exception is thrown. This option allows
+ the processor to fail fast when encountering CSV with
mismatching quotes - the normal behaviour
+ would be to continue reading until the matching quote is
found, which could potentially mean reading
+ the whole file (and exhausting all available memory). Zero
value will disable this option.
+ """)
+ .required(true)
+ .defaultValue("0")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
public static final PropertyDescriptor DELIMITER_CHARACTER = new
PropertyDescriptor.Builder()
.name("validate-csv-delimiter")
.displayName("Delimiter character")
@@ -200,6 +214,7 @@ public class ValidateCsv extends AbstractProcessor {
HEADER,
DELIMITER_CHARACTER,
QUOTE_CHARACTER,
+ MAX_LINES_PER_ROW,
END_OF_LINE_CHARACTER,
VALIDATION_STRATEGY,
INCLUDE_ALL_VIOLATIONS
@@ -265,9 +280,28 @@ public class ValidateCsv extends AbstractProcessor {
// input is transferred over to Java as is. So when you type the
characters "\"
// and "n" into the UI the Java string will end up being those two
characters
// not the interpreted value "\n".
- final String msgDemarcator =
context.getProperty(END_OF_LINE_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().replace("\\n",
"\n").replace("\\r", "\r").replace("\\t", "\t");
- return new
CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().charAt(0),
-
context.getProperty(DELIMITER_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().charAt(0),
msgDemarcator).build();
+ final String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue()
+ .replace("\\n", "\n")
+ .replace("\\r", "\r")
+ .replace("\\t", "\t");
+
+ final char quoteChar = context.getProperty(QUOTE_CHARACTER)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue()
+ .charAt(0);
+
+ final int delimiterChar = context.getProperty(DELIMITER_CHARACTER)
+ .evaluateAttributeExpressions(flowFile)
+ .getValue()
+ .charAt(0);
+
+ final int maxLinesPerRow =
context.getProperty(MAX_LINES_PER_ROW).asInteger();
+
+ return new CsvPreference.Builder(quoteChar, delimiterChar,
msgDemarcator)
+ .maxLinesPerRow(maxLinesPerRow)
+ .build();
}
/**
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
index 43c92263d0..ab22f2a79e 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -28,6 +28,63 @@ public class TestValidateCsv {
private final TestRunner runner = TestRunners.newTestRunner(new
ValidateCsv());
+ @Test
+ public void testNonTerminatedQuoteCharacterForLineByLineValidation() {
+ // This test covers the scenario where a quote character is opened but
not closed before the end of the file.
+ // In such a case, there is a risk of loading the entire file into
memory, which can lead to OOM errors.
+ // This test is focused on line-by-line validation, where each line is
treated independently.
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.MAX_LINES_PER_ROW, "1");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+ runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),
StrNotNullOrEmpty()");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY,
ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
+
+ runner.enqueue("\"1,foo\n2,bar\n3,baz\n");
+ runner.run();
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+
+ MockFlowFile validFF =
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst();
+ validFF.assertAttributeEquals("count.valid.lines", "2");
+ validFF.assertAttributeEquals("count.total.lines", "3");
+ validFF.assertContentEquals("2,bar\n3,baz");
+
+ MockFlowFile invalidFF =
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).getFirst();
+ invalidFF.assertAttributeEquals("count.invalid.lines", "1");
+ invalidFF.assertAttributeEquals("count.total.lines", "3");
+ invalidFF.assertAttributeEquals("validation.error.message",
"unexpected end of line while reading quoted column on line 1");
+ invalidFF.assertContentEquals("\"1,foo\n");
+ }
+
+ @Test
+ public void testNonTerminatedQuoteCharacterForWholeFileValidation() {
+ // This test covers the scenario where a quote character is opened but
not closed before the end of the file.
+ // In such a case, there is a risk of loading the entire file into
memory, which can lead to OOM errors.
+ // This test is focused on whole file validation.
+ runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
+ runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
+ runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
+ runner.setProperty(ValidateCsv.MAX_LINES_PER_ROW, "1");
+ runner.setProperty(ValidateCsv.HEADER, "false");
+ runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),
StrNotNullOrEmpty()");
+ runner.setProperty(ValidateCsv.VALIDATION_STRATEGY,
ValidateCsv.VALIDATE_WHOLE_FLOWFILE);
+
+ runner.enqueue("\"1,foo\n2,bar\n3,baz\n");
+ runner.run();
+
+ runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
+ runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+
+ MockFlowFile invalidFF =
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).getFirst();
+ invalidFF.assertAttributeNotExists("count.invalid.lines");
+ invalidFF.assertAttributeNotExists("count.total.lines");
+ invalidFF.assertAttributeEquals("validation.error.message",
"unexpected end of line while reading quoted column on line 1");
+ invalidFF.assertContentEquals("\"1,foo\n2,bar\n3,baz\n");
+ }
+
@Test
public void testHeaderAndSplit() {
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");