This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new dda42824b6 NIFI-13199 Update ValidateRecord to avoid writing to FlowFiles that will be auto-terminated dda42824b6 is described below commit dda42824b616e5ef448c04672447663b0a621f04 Author: Jim Steinebrey <jrsteineb...@gmail.com> AuthorDate: Fri Jun 7 20:44:55 2024 -0400 NIFI-13199 Update ValidateRecord to avoid writing to FlowFiles that will be auto-terminated Signed-off-by: Matt Burgess <mattyb...@apache.org> This closes #8942 --- .../nifi/processors/standard/ValidateRecord.java | 96 ++++++++++++---------- 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java index c9edc2e280..672399dd0f 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -324,64 +324,64 @@ public class ValidateRecord extends AbstractProcessor { final SchemaValidationResult result = validator.validate(record); recordCount++; - RecordSetWriter writer; if (result.isValid()) { validCount++; if (validFlowFile == null) { validFlowFile = session.create(flowFile); } - validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema); + validWriter = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema); + writeRecord(validWriter, record); } else { invalidCount++; logValidationErrors(flowFile, recordCount, result); - if (invalidFlowFile == null) { - invalidFlowFile = session.create(flowFile); - } - - invalidWriter = writer = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema()); - - // Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if - // we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event - // that it is too noisy to be useful. - for (final ValidationError validationError : result.getValidationErrors()) { - final Optional<String> fieldName = validationError.getFieldName(); + if (!context.isAutoTerminated(REL_INVALID)) { + // If REL_INVALID is not autoTerminated, then create a flow file and calculate the invalid details. + // If it is autoTerminated, then skip doing work which will just be discarded. + if (invalidFlowFile == null) { + invalidFlowFile = session.create(flowFile); + } - switch (validationError.getType()) { - case EXTRA_FIELD: - if (fieldName.isPresent()) { - extraFields.add(fieldName.get()); - } else { + invalidWriter = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema()); + + // Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if + // we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event + // that it is too noisy to be useful. + for (final ValidationError validationError : result.getValidationErrors()) { + final Optional<String> fieldName = validationError.getFieldName(); + + switch (validationError.getType()) { + case EXTRA_FIELD: + if (fieldName.isPresent()) { + extraFields.add(fieldName.get()); + } else { + otherProblems.add(validationError.getExplanation()); + } + break; + case MISSING_FIELD: + if (fieldName.isPresent()) { + missingFields.add(fieldName.get()); + } else { + otherProblems.add(validationError.getExplanation()); + } + break; + case INVALID_FIELD: + if (fieldName.isPresent()) { + invalidFields.add(fieldName.get()); + } else { + otherProblems.add(validationError.getExplanation()); + } + break; + case OTHER: otherProblems.add(validationError.getExplanation()); - } - break; - case MISSING_FIELD: - if (fieldName.isPresent()) { - missingFields.add(fieldName.get()); - } else { - otherProblems.add(validationError.getExplanation()); - } - break; - case INVALID_FIELD: - if (fieldName.isPresent()) { - invalidFields.add(fieldName.get()); - } else { - otherProblems.add(validationError.getExplanation()); - } - break; - case OTHER: - otherProblems.add(validationError.getExplanation()); - break; + break; + } } - } - } - if (writer instanceof RawRecordWriter) { - ((RawRecordWriter) writer).writeRawRecord(record); - } else { - writer.write(record); + writeRecord(invalidWriter, record); + } } } @@ -450,6 +450,16 @@ public class ValidateRecord extends AbstractProcessor { session.remove(flowFile); } + private void writeRecord(final RecordSetWriter writer, final Record record) throws IOException { + if (writer != null) { + if (writer instanceof RawRecordWriter) { + ((RawRecordWriter) writer).writeRawRecord(record); + } else { + writer.write(record); + } + } + } + private void closeQuietly(final RecordSetWriter writer) { if (writer != null) { try {