This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 748a3149a1 NIFI-12014 NullPointerException in PutSQL when adding error attributes 748a3149a1 is described below commit 748a3149a180f14ac50aca017462540f105f39de Author: krisztina-zsihovszki <zsikr...@gmail.com> AuthorDate: Thu Aug 31 18:29:04 2023 +0200 NIFI-12014 NullPointerException in PutSQL when adding error attributes This closes #7666. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../apache/nifi/processors/standard/PutSQL.java | 36 +++++++++++++++++----- .../nifi/processors/standard/TestPutSQL.java | 34 +++++++++++++++++--- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index a8d9a95e3c..4c365c91f2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard; +import java.util.Optional; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -75,9 +76,9 @@ import java.util.function.BiFunction; import static java.lang.String.format; import static java.lang.String.valueOf; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toList; -import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure; import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError; @SupportsBatching @@ -462,15 +463,16 @@ public class PutSQL extends AbstractSessionFactoryProcessor { private ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError = createOnError(context, session, result, REL_FAILURE, REL_RETRY); onFlowFileError = onFlowFileError.andThen((ctx, flowFile, errorTypesResult, exception) -> { - flowFile = addErrorAttributesToFlowFile(session, flowFile, exception); switch (errorTypesResult.destination()) { case Failure: getLogger().error("Failed to update database for {} due to {}; routing to failure", flowFile, exception, exception); + addErrorAttributesToFlowFile(session, flowFile, exception); break; case Retry: getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", flowFile, exception, exception); + addErrorAttributesToFlowFile(session, flowFile, exception); break; case Self: getLogger().error("Failed to update database for {} due to {};", flowFile, exception, exception); @@ -485,14 +487,26 @@ public class PutSQL extends AbstractSessionFactoryProcessor { ExceptionHandler.createOnGroupError(context, session, result, REL_FAILURE, REL_RETRY); onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> { - Relationship relationship = errorTypesResult.destination() == Failure ? REL_FAILURE : REL_RETRY; - List<FlowFile> flowFilesToRelationship = result.getRoutedFlowFiles().get(relationship); - result.getRoutedFlowFiles().put(relationship, addErrorAttributesToFlowFilesInGroup(session, flowFilesToRelationship, flowFileGroup.getFlowFiles(), exception)); + switch (errorTypesResult.destination()) { + case Failure: + List<FlowFile> flowFilesToFailure = getFlowFilesOnRelationship(result, REL_FAILURE); + result.getRoutedFlowFiles().put(REL_FAILURE, addErrorAttributesToFlowFilesInGroup(session, flowFilesToFailure, flowFileGroup.getFlowFiles(), exception)); + break; + case Retry: + List<FlowFile> flowFilesToRetry = getFlowFilesOnRelationship(result, REL_RETRY); + result.getRoutedFlowFiles().put(REL_RETRY, addErrorAttributesToFlowFilesInGroup(session, flowFilesToRetry, flowFileGroup.getFlowFiles(), exception)); + break; + } }); return onGroupError; } + private List<FlowFile> getFlowFilesOnRelationship(RoutingResult result, final Relationship relationship) { + return Optional.ofNullable(result.getRoutedFlowFiles().get(relationship)) + .orElse(emptyList()); + } + private List<FlowFile> addErrorAttributesToFlowFilesInGroup(ProcessSession session, List<FlowFile> flowFilesOnRelationship, List<FlowFile> flowFilesInGroup, Exception exception) { return flowFilesOnRelationship.stream() .map(ff -> flowFilesInGroup.contains(ff) ? addErrorAttributesToFlowFile(session, ff, exception) : ff) @@ -861,8 +875,16 @@ public class PutSQL extends AbstractSessionFactoryProcessor { attributes.put(ERROR_MESSAGE_ATTR, exception.getMessage()); if (exception instanceof SQLException) { - attributes.put(ERROR_CODE_ATTR, valueOf(((SQLException) exception).getErrorCode())); - attributes.put(ERROR_SQL_STATE_ATTR, valueOf(((SQLException) exception).getSQLState())); + int errorCode = ((SQLException) exception).getErrorCode(); + String sqlState = ((SQLException) exception).getSQLState(); + + if (errorCode > 0) { + attributes.put(ERROR_CODE_ATTR, valueOf(errorCode)); + } + + if (sqlState != null) { + attributes.put(ERROR_SQL_STATE_ATTR, sqlState); + } } return session.putAllAttributes(flowFile, attributes); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index ba09b85d63..b4433611b9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -397,6 +397,32 @@ public class TestPutSQL { runner.assertTransferCount(PutSQL.REL_SUCCESS, 0); } + @Test + public void testFailInMiddleWithNumberFormatException() throws InitializationException, ProcessException { + final TestRunner runner = initTestRunner(); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "false"); + runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false"); + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("sql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER)); + badAttributes.put("sql.args.1.value", "hello"); + + final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + + runner.run(); + + runner.assertTransferCount(PutSQL.REL_FAILURE, 1); + runner.assertTransferCount(PutSQL.REL_SUCCESS, 3); + assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_FAILURE); + } + @Test public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException { final TestRunner runner = initTestRunner(); @@ -1260,7 +1286,7 @@ public class TestPutSQL { // should fail because of the semicolon runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1); - assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_RETRY); + assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_RETRY); } @Test @@ -1411,7 +1437,7 @@ public class TestPutSQL { // No FlowFiles should be transferred because there were not enough FlowFiles with the same fragment identifier runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1); - assertNonSQLExceptionRelatedAttribute(runner); + assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_FAILURE); } @Test @@ -1740,8 +1766,8 @@ public class TestPutSQL { }); } - private static void assertNonSQLExceptionRelatedAttribute(final TestRunner runner) { - List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQL.REL_FAILURE); + private static void assertNonSQLErrorRelatedAttributes(final TestRunner runner, Relationship relationship) { + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(relationship); flowFiles.forEach(ff -> { ff.assertAttributeExists("error.message"); });