This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7767a5b85d NIFI-10265 Route to Failure on Connect Exceptions in
PutDatabaseRecord
7767a5b85d is described below
commit 7767a5b85d1633fde17414bf203f6e6e659236f6
Author: exceptionfactory <[email protected]>
AuthorDate: Thu Jul 21 22:57:02 2022 -0500
NIFI-10265 Route to Failure on Connect Exceptions in PutDatabaseRecord
Signed-off-by: Matthew Burgess <[email protected]>
This closes #6235
---
.../processors/standard/PutDatabaseRecord.java | 42 ++++++++-----
.../processors/standard/PutDatabaseRecordTest.java | 73 ++++++++++++++--------
2 files changed, 72 insertions(+), 43 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index c15cddca29..28f2a3b5e3 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -84,6 +84,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -479,10 +480,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final Connection connection =
dbcpService.getConnection(flowFile.getAttributes());
+ Optional<Connection> connectionHolder = Optional.empty();
boolean originalAutoCommit = false;
try {
+ final Connection connection =
dbcpService.getConnection(flowFile.getAttributes());
+ connectionHolder = Optional.of(connection);
+
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
@@ -513,25 +517,31 @@ public class PutDatabaseRecord extends AbstractProcessor {
session.transfer(flowFile, relationship);
}
- try {
- connection.rollback();
- } catch (final Exception e1) {
- getLogger().error("Failed to rollback JDBC transaction", e1);
- }
- } finally {
- if (originalAutoCommit) {
+ connectionHolder.ifPresent(connection -> {
try {
- connection.setAutoCommit(true);
- } catch (final Exception e) {
- getLogger().warn("Failed to set auto-commit back to true
on connection {} after finishing update", connection);
+ connection.rollback();
+ } catch (final Exception rollbackException) {
+ getLogger().error("Failed to rollback JDBC transaction",
rollbackException);
}
+ });
+ } finally {
+ if (originalAutoCommit) {
+ connectionHolder.ifPresent(connection -> {
+ try {
+ connection.setAutoCommit(true);
+ } catch (final Exception autoCommitException) {
+ getLogger().warn("Failed to set auto-commit back to
true on connection", autoCommitException);
+ }
+ });
}
- try {
- connection.close();
- } catch (final Exception e) {
- getLogger().warn("Failed to close database connection", e);
- }
+ connectionHolder.ifPresent(connection -> {
+ try {
+ connection.close();
+ } catch (final Exception closeException) {
+ getLogger().warn("Failed to close database connection",
closeException);
+ }
+ });
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index da458908b5..04206f8933 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -24,10 +24,10 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
@@ -43,25 +43,28 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.spy;
-
+import static org.mockito.Mockito.when;
public class PutDatabaseRecordTest {
+ private static final String CONNECTION_FAILED = "Connection Failed";
+
+ private static final String PARSER_ID =
MockRecordParser.class.getSimpleName();
+
+ private static final String TABLE_NAME = "PERSONS";
+
private static final String createPersons = "CREATE TABLE PERSONS (id
integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code <
1000), dt date)";
- private static final String createPersonsSchema1 = "CREATE TABLE
SCHEMA1.PERSONS (id integer primary key, name varchar(100)," +
- " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code <
1000), dt date)";
- private static final String createPersonsSchema2 = "CREATE TABLE
SCHEMA2.PERSONS (id2 integer primary key, name varchar(100)," +
- " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code <
1000), dt date)";
private final static String DB_LOCATION = "target/db_pdr";
TestRunner runner;
PutDatabaseRecord processor;
DBCPServiceSimpleImpl dbcp;
- @BeforeClass
- public static void setupBeforeClass() throws IOException {
+ @BeforeAll
+ public static void setDatabaseLocation() {
System.setProperty("derby.stream.error.file", "target/derby.log");
// remove previous test database, if any
@@ -73,8 +76,8 @@ public class PutDatabaseRecordTest {
}
}
- @AfterClass
- public static void cleanUpAfterClass() throws Exception {
+ @AfterAll
+ public static void shutdownDatabase() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION +
";shutdown=true");
} catch (SQLNonTransientConnectionException ignore) {
@@ -89,8 +92,8 @@ public class PutDatabaseRecordTest {
}
}
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ public void setRunner() throws Exception {
processor = new PutDatabaseRecord();
//Mock the DBCP Controller Service so we can control the Results
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION));
@@ -104,7 +107,25 @@ public class PutDatabaseRecordTest {
}
@Test
- public void testInsertNonRequiredColumnsUnmatchedField() throws
InitializationException, ProcessException, SQLException, IOException {
+ public void testGetConnectionFailure() throws InitializationException {
+ final MockRecordParser parser = new MockRecordParser();
+ runner.addControllerService(PARSER_ID, parser);
+ runner.enableControllerService(parser);
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, PARSER_ID);
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, TABLE_NAME);
+
+ when(dbcp.getConnection(anyMap())).thenThrow(new
ProcessException(CONNECTION_FAILED));
+
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
+ }
+
+ @Test
+ public void testInsertNonRequiredColumnsUnmatchedField() throws
InitializationException, ProcessException {
// Need to override the @Before method with a new processor that
behaves badly
processor = new PutDatabaseRecordUnmatchedField();
//Mock the DBCP Controller Service so we can control the Results
@@ -117,9 +138,9 @@ public class PutDatabaseRecordTest {
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
- recreateTable(createPersons);
+ recreateTable();
final MockRecordParser parser = new MockRecordParser();
- runner.addControllerService("parser", parser);
+ runner.addControllerService(PARSER_ID, parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
@@ -128,11 +149,9 @@ public class PutDatabaseRecordTest {
parser.addSchemaField("dt", RecordFieldType.DATE);
LocalDate testDate1 = LocalDate.of(2021, 1, 26);
- Date nifiDate1 = new
Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in
UTC
- Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
+ Date nifiDate1 = new
Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
- Date nifiDate2 = new
Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); // in
URC
- Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
+ Date nifiDate2 = new
Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
parser.addRecord(1, "rec1", "test", nifiDate1);
parser.addRecord(2, "rec2", "test", nifiDate2);
@@ -140,9 +159,9 @@ public class PutDatabaseRecordTest {
parser.addRecord(4, "rec4", "test", null);
parser.addRecord(5, null, null, null);
- runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, PARSER_ID);
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.INSERT_TYPE);
- runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, TABLE_NAME);
runner.setProperty(PutDatabaseRecord.UNMATCHED_FIELD_BEHAVIOR,
PutDatabaseRecord.FAIL_UNMATCHED_FIELD);
runner.enqueue(new byte[0]);
@@ -152,11 +171,11 @@ public class PutDatabaseRecordTest {
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
}
- private void recreateTable(String createSQL) throws ProcessException,
SQLException {
+ private void recreateTable() throws ProcessException {
try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) {
stmt.execute("drop table PERSONS");
- stmt.execute(createSQL);
+ stmt.execute(createPersons);
} catch (SQLException ignore) {
// Do nothing, may not have existed
}
@@ -164,7 +183,7 @@ public class PutDatabaseRecordTest {
static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
@Override
- SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String
tableName, TableSchema tableSchema, DMLSettings settings) throws
IllegalArgumentException, SQLException {
+ SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String
tableName, TableSchema tableSchema, DMLSettings settings) throws
IllegalArgumentException {
return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES
(?,?,?,?)", Arrays.asList(0,1,2,3));
}
}