This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bc95799a39 NIFI-13397 Updated PutDatabaseRecord to retry transient 
ProcessException causes
bc95799a39 is described below

commit bc95799a397eaf1fda6a0db4927724b85bd2a496
Author: Jim Steinebrey <jrsteineb...@gmail.com>
AuthorDate: Thu Jun 13 12:27:17 2024 -0400

    NIFI-13397 Updated PutDatabaseRecord to retry transient ProcessException 
causes
    
    This closes #8964
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../processors/standard/PutDatabaseRecord.java     |  2 +-
 .../processors/standard/PutDatabaseRecordTest.java | 70 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index e0dbf5b38b..3210c609a4 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -585,7 +585,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
         // When an Exception is thrown, we want to route to 'retry' if we 
expect that attempting the same request again
         // might work. Otherwise, route to failure. SQLTransientException is a 
specific type that indicates that a retry may work.
         final Relationship relationship;
-        final Throwable toAnalyze = (e instanceof BatchUpdateException) ? 
e.getCause() : e;
+        final Throwable toAnalyze = (e instanceof BatchUpdateException || e 
instanceof ProcessException) ? e.getCause() : e;
         if (toAnalyze instanceof SQLTransientException) {
             relationship = REL_RETRY;
             flowFile = session.penalize(flowFile);
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index f2678bb3ae..dd004e7db7 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -58,6 +58,7 @@ import java.sql.ResultSet;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTransientException;
 import java.sql.Statement;
 import java.time.LocalDate;
 import java.time.ZoneOffset;
@@ -263,6 +264,57 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
+    public void testProcessExceptionRouteRetry() throws 
InitializationException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
+
+        // This exception should route to REL_RETRY because its cause is 
SQLTransientException
+        dbcp = new DBCPServiceThrowConnectionException(new 
SQLTransientException("connection failed"));
+        final Map<String, String> dbcpProperties = new HashMap<>();
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
+
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_RETRY);
+    }
+
+    @Test
+    public void testProcessExceptionRouteFailure() throws 
InitializationException, SQLException {
+        setRunner(TestCaseEnum.DEFAULT_1.getTestCase());
+
+        // This exception should route to REL_FAILURE because its cause is NOT 
SQLTransientException
+        dbcp = new DBCPServiceThrowConnectionException(new 
NullPointerException("connection is null"));
+        final Map<String, String> dbcpProperties = new HashMap<>();
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
+
+        final MockRecordParser parser = new MockRecordParser();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
+    }
+
     public void testInsertNonRequiredColumnsUnmatchedField() throws 
InitializationException, ProcessException {
         setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
 
@@ -2335,6 +2387,24 @@ public class PutDatabaseRecordTest {
         }
     }
 
+    static class DBCPServiceThrowConnectionException extends 
AbstractControllerService implements DBCPService {
+        private final Exception rootCause;
+
+        public DBCPServiceThrowConnectionException(final Exception rootCause) {
+            this.rootCause = rootCause;
+        }
+
+        @Override
+        public String getIdentifier() {
+            return DBCP_SERVICE_ID;
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            throw new ProcessException(rootCause);
+        }
+    }
+
     static class DBCPServiceAutoCommitTest extends AbstractControllerService 
implements DBCPService {
         private final String databaseLocation;
 

Reply via email to