This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 748a3149a1 NIFI-12014 NullPointerException in PutSQL when adding error 
attributes
748a3149a1 is described below

commit 748a3149a180f14ac50aca017462540f105f39de
Author: krisztina-zsihovszki <zsikr...@gmail.com>
AuthorDate: Thu Aug 31 18:29:04 2023 +0200

    NIFI-12014 NullPointerException in PutSQL when adding error attributes
    
    This closes #7666.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../apache/nifi/processors/standard/PutSQL.java    | 36 +++++++++++++++++-----
 .../nifi/processors/standard/TestPutSQL.java       | 34 +++++++++++++++++---
 2 files changed, 59 insertions(+), 11 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index a8d9a95e3c..4c365c91f2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.util.Optional;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -75,9 +76,9 @@ import java.util.function.BiFunction;
 
 import static java.lang.String.format;
 import static java.lang.String.valueOf;
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure;
 import static 
org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
 
 @SupportsBatching
@@ -462,15 +463,16 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
     private ExceptionHandler.OnError<FunctionContext, FlowFile> 
onFlowFileError(final ProcessContext context, final ProcessSession session, 
final RoutingResult result) {
         ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError = 
createOnError(context, session, result, REL_FAILURE, REL_RETRY);
         onFlowFileError = onFlowFileError.andThen((ctx, flowFile, 
errorTypesResult, exception) -> {
-            flowFile = addErrorAttributesToFlowFile(session, flowFile, 
exception);
 
             switch (errorTypesResult.destination()) {
                 case Failure:
                     getLogger().error("Failed to update database for {} due to 
{}; routing to failure", flowFile, exception, exception);
+                    addErrorAttributesToFlowFile(session, flowFile, exception);
                     break;
                 case Retry:
                     getLogger().error("Failed to update database for {} due to 
{}; it is possible that retrying the operation will succeed, so routing to 
retry",
                            flowFile, exception, exception);
+                    addErrorAttributesToFlowFile(session, flowFile, exception);
                     break;
                 case Self:
                     getLogger().error("Failed to update database for {} due to 
{};",  flowFile, exception, exception);
@@ -485,14 +487,26 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
                 ExceptionHandler.createOnGroupError(context, session, result, 
REL_FAILURE, REL_RETRY);
 
         onGroupError = onGroupError.andThen((ctx, flowFileGroup, 
errorTypesResult, exception) -> {
-            Relationship relationship = errorTypesResult.destination() == 
Failure ? REL_FAILURE : REL_RETRY;
-            List<FlowFile> flowFilesToRelationship = 
result.getRoutedFlowFiles().get(relationship);
-            result.getRoutedFlowFiles().put(relationship, 
addErrorAttributesToFlowFilesInGroup(session, flowFilesToRelationship, 
flowFileGroup.getFlowFiles(), exception));
+            switch (errorTypesResult.destination()) {
+                case Failure:
+                    List<FlowFile> flowFilesToFailure = 
getFlowFilesOnRelationship(result, REL_FAILURE);
+                    result.getRoutedFlowFiles().put(REL_FAILURE, 
addErrorAttributesToFlowFilesInGroup(session, flowFilesToFailure, 
flowFileGroup.getFlowFiles(), exception));
+                    break;
+                case Retry:
+                    List<FlowFile> flowFilesToRetry = 
getFlowFilesOnRelationship(result, REL_RETRY);
+                    result.getRoutedFlowFiles().put(REL_RETRY, 
addErrorAttributesToFlowFilesInGroup(session, flowFilesToRetry, 
flowFileGroup.getFlowFiles(), exception));
+                    break;
+            }
         });
 
         return onGroupError;
     }
 
+    private List<FlowFile> getFlowFilesOnRelationship(RoutingResult result, 
final Relationship relationship) {
+        return 
Optional.ofNullable(result.getRoutedFlowFiles().get(relationship))
+                .orElse(emptyList());
+    }
+
     private List<FlowFile> addErrorAttributesToFlowFilesInGroup(ProcessSession 
session, List<FlowFile> flowFilesOnRelationship, List<FlowFile> 
flowFilesInGroup, Exception exception) {
         return flowFilesOnRelationship.stream()
                     .map(ff ->  flowFilesInGroup.contains(ff) ? 
addErrorAttributesToFlowFile(session, ff, exception) : ff)
@@ -861,8 +875,16 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
         attributes.put(ERROR_MESSAGE_ATTR, exception.getMessage());
 
         if (exception instanceof SQLException) {
-            attributes.put(ERROR_CODE_ATTR, valueOf(((SQLException) 
exception).getErrorCode()));
-            attributes.put(ERROR_SQL_STATE_ATTR, valueOf(((SQLException) 
exception).getSQLState()));
+            int errorCode = ((SQLException) exception).getErrorCode();
+            String sqlState = ((SQLException) exception).getSQLState();
+
+            if (errorCode > 0) {
+                attributes.put(ERROR_CODE_ATTR, valueOf(errorCode));
+            }
+
+            if (sqlState != null) {
+                attributes.put(ERROR_SQL_STATE_ATTR, sqlState);
+            }
         }
 
         return session.putAllAttributes(flowFile, attributes);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index ba09b85d63..b4433611b9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -397,6 +397,32 @@ public class TestPutSQL {
         runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
     }
 
+    @Test
+    public void testFailInMiddleWithNumberFormatException() throws 
InitializationException, ProcessException {
+        final TestRunner runner = initTestRunner();
+        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "false");
+        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
+        final Map<String, String> goodAttributes = new HashMap<>();
+        goodAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        goodAttributes.put("sql.args.1.value", "84");
+
+        final Map<String, String> badAttributes = new HashMap<>();
+        badAttributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
+        badAttributes.put("sql.args.1.value", "hello");
+
+        final byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES 
('Mark', ?)".getBytes();
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, badAttributes);
+        runner.enqueue(data, goodAttributes);
+        runner.enqueue(data, goodAttributes);
+
+        runner.run();
+
+        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
+        runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
+        assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_FAILURE);
+    }
+
     @Test
     public void testFailInMiddleWithBadParameterValueAndSupportTransaction() 
throws InitializationException, ProcessException, SQLException {
         final TestRunner runner = initTestRunner();
@@ -1260,7 +1286,7 @@ public class TestPutSQL {
 
         // should fail because of the semicolon
         runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
-        assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_RETRY);
+        assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_RETRY);
     }
 
     @Test
@@ -1411,7 +1437,7 @@ public class TestPutSQL {
 
         // No FlowFiles should be transferred because there were not enough 
FlowFiles with the same fragment identifier
         runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
-        assertNonSQLExceptionRelatedAttribute(runner);
+        assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_FAILURE);
     }
 
     @Test
@@ -1740,8 +1766,8 @@ public class TestPutSQL {
         });
     }
 
-    private static void assertNonSQLExceptionRelatedAttribute(final TestRunner 
runner) {
-        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutSQL.REL_FAILURE);
+    private static void assertNonSQLErrorRelatedAttributes(final TestRunner 
runner,  Relationship relationship) {
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(relationship);
         flowFiles.forEach(ff -> {
             ff.assertAttributeExists("error.message");
         });

Reply via email to