This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 05f3cad  NIFI-6370: Allow multiple SQL statements in PutDatabaseRecord
05f3cad is described below

commit 05f3cadee8ee76aa05a1d35ffb4598ddeca4f549
Author: Matthew Burgess <mattyb...@apache.org>
AuthorDate: Mon Jun 10 12:07:44 2019 -0400

    NIFI-6370: Allow multiple SQL statements in PutDatabaseRecord
    
    This closes #3528.
    
    Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>
---
 .../processors/standard/PutDatabaseRecord.java     | 34 +++++++++-
 .../standard/TestPutDatabaseRecord.groovy          | 77 +++++++++++++++++++++-
 2 files changed, 107 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index d79cf3c..3046cbf 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -239,6 +239,17 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    static final PropertyDescriptor ALLOW_MULTIPLE_STATEMENTS = new 
PropertyDescriptor.Builder()
+            .name("put-db-record-allow-multiple-statements")
+            .displayName("Allow Multiple SQL Statements")
+            .description("If the Statement Type is 'SQL' (as set in the 
statement.type attribute), this field indicates whether to split the field 
value by a semicolon and execute each statement "
+                    + "separately. If any statement causes an error, the 
entire set of statements will be rolled back. If the Statement Type is not 
'SQL', this field is ignored.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
     static final PropertyDescriptor QUOTED_IDENTIFIERS = new 
PropertyDescriptor.Builder()
             .name("put-db-record-quoted-identifiers")
             .displayName("Quote Column Identifiers")
@@ -309,6 +320,7 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
         pds.add(UNMATCHED_COLUMN_BEHAVIOR);
         pds.add(UPDATE_KEYS);
         pds.add(FIELD_CONTAINING_SQL);
+        pds.add(ALLOW_MULTIPLE_STATEMENTS);
         pds.add(QUOTED_IDENTIFIERS);
         pds.add(QUOTED_TABLE_IDENTIFIER);
         pds.add(QUERY_TIMEOUT);
@@ -404,7 +416,15 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
 
             getLogger().warn("Failed to process {} due to {}", new 
Object[]{inputFlowFile, e}, e);
 
-            if (e instanceof BatchUpdateException) {
+            // Check if there was a BatchUpdateException or if multiple SQL 
statements were being executed and one failed
+            final String statementTypeProperty = 
context.getProperty(STATEMENT_TYPE).getValue();
+            String statementType = statementTypeProperty;
+            if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
+                statementType = 
inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
+            }
+
+            if (e instanceof BatchUpdateException
+                    || (SQL_TYPE.equalsIgnoreCase(statementType) && 
context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean())) {
                 try {
                     // Although process session will move forward in order to 
route the failed FlowFile,
                     // database transaction should be rolled back to avoid 
partial batch update.
@@ -567,8 +587,16 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
                     throw new MalformedRecordException(format("Record had no 
(or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, 
flowFile));
                 }
 
-                // Execute the statement as-is
-                s.execute((String) sql);
+                // Execute the statement(s) as-is
+                if 
(context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) {
+                    String regex = "(?<!\\\\);";
+                    String[] sqlStatements = ((String) sql).split(regex);
+                    for (String statement : sqlStatements) {
+                        s.execute(statement);
+                    }
+                } else {
+                    s.execute((String) sql);
+                }
             }
             result.routeTo(flowFile, REL_SUCCESS);
             session.getProvenanceReporter().send(flowFile, 
functionContext.jdbcUrl);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 0ddac80..f25bbb2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -53,7 +53,6 @@ import static org.junit.Assert.assertTrue
 import static org.junit.Assert.fail
 import static org.mockito.Matchers.anyMap
 import static org.mockito.Mockito.doAnswer
-import static org.mockito.Mockito.only
 import static org.mockito.Mockito.spy
 import static org.mockito.Mockito.times
 import static org.mockito.Mockito.verify
@@ -414,6 +413,82 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testMultipleInsertsViaSqlStatementType() throws 
InitializationException, ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("sql", RecordFieldType.STRING)
+
+        parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 
'rec1',101);INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)''')
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.USE_ATTR_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+        runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
+        runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 'true')
+
+        def attrs = [:]
+        attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
+        runner.enqueue(new byte[0], attrs)
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(101, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(102, rs.getInt(3))
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
+    void testMultipleInsertsViaSqlStatementTypeBadSQL() throws 
InitializationException, ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("sql", RecordFieldType.STRING)
+
+        parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 
'rec1',101);
+                        INSERT INTO PERSONS (id, name, code) VALUES (2, 
'rec2',102);
+                        INSERT INTO PERSONS2 (id, name, code) VALUES (2, 
'rec2',102);''')
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.USE_ATTR_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+        runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
+        runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 'true')
+
+        def attrs = [:]
+        attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
+        runner.enqueue(new byte[0], attrs)
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
+        runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
+        final Connection conn = dbcp.getConnection()
+        final Statement stmt = conn.createStatement()
+        final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        // The first two legitimate statements should have been rolled back
+        assertFalse(rs.next())
+
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
     void testSqlStatementTypeNoValue() throws InitializationException, 
ProcessException, SQLException, IOException {
         recreateTable("PERSONS", createPersons)
         final MockRecordParser parser = new MockRecordParser()

Reply via email to