This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c61efecbdc NIFI-10252: Use correct field(s) for PutDatabaseRecord 
update keys
c61efecbdc is described below

commit c61efecbdc0f97f57f512938b50b924adeba2c04
Author: Eduardo Fontes <[email protected]>
AuthorDate: Wed Jul 20 10:38:32 2022 -0300

    NIFI-10252: Use correct field(s) for PutDatabaseRecord update keys
    
    Not always UpdateKey comes in first position
    
    Better readability
    
    Include testUpdatePkNotFirst
    
    Including usefull SQL log debug
    
    Fix semicolon
    
    NIFI-10252: Additional unit test for specified update keys
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #6226
---
 .../processors/standard/PutDatabaseRecord.java     |  6 +-
 .../standard/TestPutDatabaseRecord.groovy          | 91 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 34d90ec980..c15cddca29 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -667,6 +667,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
                             throw new 
IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", 
statementType, flowFile));
                         }
 
+                        // Log debug sqlHolder
+                        log.debug("Generated SQL: {}", sqlHolder.getSql());
                         // Create the Prepared Statement
                         final PreparedStatement preparedStatement = 
con.prepareStatement(sqlHolder.getSql());
 
@@ -1200,6 +1202,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
             for (int i = 0; i < fieldCount; i++) {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
+                boolean firstUpdateKey = true;
 
                 final String normalizedColName = 
normalizeColumnName(fieldName, settings.translateFieldNames);
                 final ColumnDescription desc = 
tableSchema.getColumns().get(normalizeColumnName(fieldName, 
settings.translateFieldNames));
@@ -1210,9 +1213,10 @@ public class PutDatabaseRecord extends AbstractProcessor 
{
 
                         if (whereFieldCount.getAndIncrement() > 0) {
                             sqlBuilder.append(" AND ");
-                        } else if (i == 0) {
+                        } else if (firstUpdateKey) {
                             // Set the WHERE clause based on the Update Key 
values
                             sqlBuilder.append(" WHERE ");
+                            firstUpdateKey = false;
                         }
 
                         if (settings.escapeColumnNames) {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 04f0460d8e..233d5ec94b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -767,6 +767,51 @@ class TestPutDatabaseRecord {
         conn.close()
     }
 
+    @Test
+    void testUpdatePkNotFirst() throws InitializationException, 
ProcessException, SQLException, IOException {
+        recreateTable('CREATE TABLE PERSONS (name varchar(100), id integer 
primary key, code integer)')
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord('rec1', 1, 201)
+        parser.addRecord('rec2', 2, 202)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.UPDATE_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        // Set some existing records with different values for name and code
+        final Connection conn = dbcp.getConnection()
+        Statement stmt = conn.createStatement()
+        stmt.execute('''INSERT INTO PERSONS VALUES ('x1', 1, 101)''')
+        stmt.execute('''INSERT INTO PERSONS VALUES ('x2', 2, 102)''')
+        stmt.close()
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals('rec1', rs.getString(1))
+        assertEquals(1, rs.getInt(2))
+        assertEquals(201, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals('rec2', rs.getString(1))
+        assertEquals(2, rs.getInt(2))
+        assertEquals(202, rs.getInt(3))
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+    
     @Test
     void testUpdateMultipleSchemas() throws InitializationException, 
ProcessException, SQLException, IOException {
         // Manually create and drop the tables and schemas
@@ -966,6 +1011,52 @@ class TestPutDatabaseRecord {
         conn.close()
     }
 
+    @Test
+    void testUpdateSpecifyUpdateKeysNotFirst() throws InitializationException, 
ProcessException, SQLException, IOException {
+        recreateTable('CREATE TABLE PERSONS (id integer, name varchar(100), 
code integer)')
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(1, 'rec1', 201)
+        parser.addRecord(2, 'rec2', 202)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.UPDATE_TYPE)
+        runner.setProperty(PutDatabaseRecord.UPDATE_KEYS, 'code')
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        // Set some existing records with different values for name and code
+        final Connection conn = dbcp.getConnection()
+        Statement stmt = conn.createStatement()
+        stmt.execute('''INSERT INTO PERSONS VALUES (10,'x1',201)''')
+        stmt.execute('''INSERT INTO PERSONS VALUES (12,'x2',202)''')
+        stmt.close()
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(201, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(202, rs.getInt(3))
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
     @Test
     void testUpdateSpecifyQuotedUpdateKeys() throws InitializationException, 
ProcessException, SQLException, IOException {
         recreateTable('CREATE TABLE PERSONS ("id" integer, name varchar(100), 
code integer)')

Reply via email to