This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new dda42824b6 NIFI-13199 Update ValidateRecord to avoid writing to 
FlowFiles that will be auto-terminated
dda42824b6 is described below

commit dda42824b616e5ef448c04672447663b0a621f04
Author: Jim Steinebrey <jrsteineb...@gmail.com>
AuthorDate: Fri Jun 7 20:44:55 2024 -0400

    NIFI-13199 Update ValidateRecord to avoid writing to FlowFiles that will be 
auto-terminated
    
    Signed-off-by: Matt Burgess <mattyb...@apache.org>
    
    This closes #8942
---
 .../nifi/processors/standard/ValidateRecord.java   | 96 ++++++++++++----------
 1 file changed, 53 insertions(+), 43 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
index c9edc2e280..672399dd0f 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
@@ -324,64 +324,64 @@ public class ValidateRecord extends AbstractProcessor {
                     final SchemaValidationResult result = 
validator.validate(record);
                     recordCount++;
 
-                    RecordSetWriter writer;
                     if (result.isValid()) {
                         validCount++;
                         if (validFlowFile == null) {
                             validFlowFile = session.create(flowFile);
                         }
 
-                        validWriter = writer = createIfNecessary(validWriter, 
validRecordWriterFactory, session, validFlowFile, validationSchema);
+                        validWriter = createIfNecessary(validWriter, 
validRecordWriterFactory, session, validFlowFile, validationSchema);
 
+                        writeRecord(validWriter, record);
                     } else {
                         invalidCount++;
                         logValidationErrors(flowFile, recordCount, result);
 
-                        if (invalidFlowFile == null) {
-                            invalidFlowFile = session.create(flowFile);
-                        }
-
-                        invalidWriter = writer = 
createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, 
invalidFlowFile, record.getSchema());
-
-                        // Add all of the validation errors to our 
Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
-                        // we keep too many then we both use up a lot of heap 
and risk outputting so much information in the Provenance Event
-                        // that it is too noisy to be useful.
-                        for (final ValidationError validationError : 
result.getValidationErrors()) {
-                            final Optional<String> fieldName = 
validationError.getFieldName();
+                        if (!context.isAutoTerminated(REL_INVALID)) {
+                            // If REL_INVALID is not autoTerminated, then 
create a flow file and calculate the invalid details.
+                            // If it is autoTerminated, then skip doing work 
which will just be discarded.
+                            if (invalidFlowFile == null) {
+                                invalidFlowFile = session.create(flowFile);
+                            }
 
-                            switch (validationError.getType()) {
-                                case EXTRA_FIELD:
-                                    if (fieldName.isPresent()) {
-                                        extraFields.add(fieldName.get());
-                                    } else {
+                            invalidWriter = createIfNecessary(invalidWriter, 
invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema());
+
+                            // Add all of the validation errors to our 
Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
+                            // we keep too many then we both use up a lot of 
heap and risk outputting so much information in the Provenance Event
+                            // that it is too noisy to be useful.
+                            for (final ValidationError validationError : 
result.getValidationErrors()) {
+                                final Optional<String> fieldName = 
validationError.getFieldName();
+
+                                switch (validationError.getType()) {
+                                    case EXTRA_FIELD:
+                                        if (fieldName.isPresent()) {
+                                            extraFields.add(fieldName.get());
+                                        } else {
+                                            
otherProblems.add(validationError.getExplanation());
+                                        }
+                                        break;
+                                    case MISSING_FIELD:
+                                        if (fieldName.isPresent()) {
+                                            missingFields.add(fieldName.get());
+                                        } else {
+                                            
otherProblems.add(validationError.getExplanation());
+                                        }
+                                        break;
+                                    case INVALID_FIELD:
+                                        if (fieldName.isPresent()) {
+                                            invalidFields.add(fieldName.get());
+                                        } else {
+                                            
otherProblems.add(validationError.getExplanation());
+                                        }
+                                        break;
+                                    case OTHER:
                                         
otherProblems.add(validationError.getExplanation());
-                                    }
-                                    break;
-                                case MISSING_FIELD:
-                                    if (fieldName.isPresent()) {
-                                        missingFields.add(fieldName.get());
-                                    } else {
-                                        
otherProblems.add(validationError.getExplanation());
-                                    }
-                                    break;
-                                case INVALID_FIELD:
-                                    if (fieldName.isPresent()) {
-                                        invalidFields.add(fieldName.get());
-                                    } else {
-                                        
otherProblems.add(validationError.getExplanation());
-                                    }
-                                    break;
-                                case OTHER:
-                                    
otherProblems.add(validationError.getExplanation());
-                                    break;
+                                        break;
+                                }
                             }
-                        }
-                    }
 
-                    if (writer instanceof RawRecordWriter) {
-                        ((RawRecordWriter) writer).writeRawRecord(record);
-                    } else {
-                        writer.write(record);
+                            writeRecord(invalidWriter, record);
+                        }
                     }
                 }
 
@@ -450,6 +450,16 @@ public class ValidateRecord extends AbstractProcessor {
         session.remove(flowFile);
     }
 
+    private void writeRecord(final RecordSetWriter writer, final Record 
record) throws IOException {
+        if (writer != null) {
+            if (writer instanceof RawRecordWriter) {
+                ((RawRecordWriter) writer).writeRawRecord(record);
+            } else {
+                writer.write(record);
+            }
+        }
+    }
+
     private void closeQuietly(final RecordSetWriter writer) {
         if (writer != null) {
             try {

Reply via email to