This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c61efecbdc NIFI-10252: Use correct field(s) for PutDatabaseRecord
update keys
c61efecbdc is described below
commit c61efecbdc0f97f57f512938b50b924adeba2c04
Author: Eduardo Fontes <[email protected]>
AuthorDate: Wed Jul 20 10:38:32 2022 -0300
NIFI-10252: Use correct field(s) for PutDatabaseRecord update keys
Not always UpdateKey comes in first position
Better readability
Include testUpdatePkNotFirst
Including usefull SQL log debug
Fix semicolon
NIFI-10252: Additional unit test for specified update keys
Signed-off-by: Matthew Burgess <[email protected]>
This closes #6226
---
.../processors/standard/PutDatabaseRecord.java | 6 +-
.../standard/TestPutDatabaseRecord.groovy | 91 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 34d90ec980..c15cddca29 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -667,6 +667,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
throw new
IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s",
statementType, flowFile));
}
+ // Log debug sqlHolder
+ log.debug("Generated SQL: {}", sqlHolder.getSql());
// Create the Prepared Statement
final PreparedStatement preparedStatement =
con.prepareStatement(sqlHolder.getSql());
@@ -1200,6 +1202,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
+ boolean firstUpdateKey = true;
final String normalizedColName =
normalizeColumnName(fieldName, settings.translateFieldNames);
final ColumnDescription desc =
tableSchema.getColumns().get(normalizeColumnName(fieldName,
settings.translateFieldNames));
@@ -1210,9 +1213,10 @@ public class PutDatabaseRecord extends AbstractProcessor
{
if (whereFieldCount.getAndIncrement() > 0) {
sqlBuilder.append(" AND ");
- } else if (i == 0) {
+ } else if (firstUpdateKey) {
// Set the WHERE clause based on the Update Key
values
sqlBuilder.append(" WHERE ");
+ firstUpdateKey = false;
}
if (settings.escapeColumnNames) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 04f0460d8e..233d5ec94b 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -767,6 +767,51 @@ class TestPutDatabaseRecord {
conn.close()
}
+ @Test
+ void testUpdatePkNotFirst() throws InitializationException,
ProcessException, SQLException, IOException {
+ recreateTable('CREATE TABLE PERSONS (name varchar(100), id integer
primary key, code integer)')
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ parser.addRecord('rec1', 1, 201)
+ parser.addRecord('rec2', 2, 202)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.UPDATE_TYPE)
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ // Set some existing records with different values for name and code
+ final Connection conn = dbcp.getConnection()
+ Statement stmt = conn.createStatement()
+ stmt.execute('''INSERT INTO PERSONS VALUES ('x1', 1, 101)''')
+ stmt.execute('''INSERT INTO PERSONS VALUES ('x2', 2, 102)''')
+ stmt.close()
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ stmt = conn.createStatement()
+ final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ assertTrue(rs.next())
+ assertEquals('rec1', rs.getString(1))
+ assertEquals(1, rs.getInt(2))
+ assertEquals(201, rs.getInt(3))
+ assertTrue(rs.next())
+ assertEquals('rec2', rs.getString(1))
+ assertEquals(2, rs.getInt(2))
+ assertEquals(202, rs.getInt(3))
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
+
@Test
void testUpdateMultipleSchemas() throws InitializationException,
ProcessException, SQLException, IOException {
// Manually create and drop the tables and schemas
@@ -966,6 +1011,52 @@ class TestPutDatabaseRecord {
conn.close()
}
+ @Test
+ void testUpdateSpecifyUpdateKeysNotFirst() throws InitializationException,
ProcessException, SQLException, IOException {
+ recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100),
code integer)')
+ final MockRecordParser parser = new MockRecordParser()
+ runner.addControllerService("parser", parser)
+ runner.enableControllerService(parser)
+
+ parser.addSchemaField("id", RecordFieldType.INT)
+ parser.addSchemaField("name", RecordFieldType.STRING)
+ parser.addSchemaField("code", RecordFieldType.INT)
+
+ parser.addRecord(1, 'rec1', 201)
+ parser.addRecord(2, 'rec2', 202)
+
+ runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+ runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE,
PutDatabaseRecord.UPDATE_TYPE)
+ runner.setProperty(PutDatabaseRecord.UPDATE_KEYS, 'code')
+ runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+ // Set some existing records with different values for name and code
+ final Connection conn = dbcp.getConnection()
+ Statement stmt = conn.createStatement()
+ stmt.execute('''INSERT INTO PERSONS VALUES (10,'x1',201)''')
+ stmt.execute('''INSERT INTO PERSONS VALUES (12,'x2',202)''')
+ stmt.close()
+
+ runner.enqueue(new byte[0])
+ runner.run()
+
+ runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+ stmt = conn.createStatement()
+ final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+ assertTrue(rs.next())
+ assertEquals(1, rs.getInt(1))
+ assertEquals('rec1', rs.getString(2))
+ assertEquals(201, rs.getInt(3))
+ assertTrue(rs.next())
+ assertEquals(2, rs.getInt(1))
+ assertEquals('rec2', rs.getString(2))
+ assertEquals(202, rs.getInt(3))
+ assertFalse(rs.next())
+
+ stmt.close()
+ conn.close()
+ }
+
@Test
void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException,
ProcessException, SQLException, IOException {
recreateTable('CREATE TABLE PERSONS ("id" integer, name varchar(100),
code integer)')