This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 05f3cad NIFI-6370: Allow multiple SQL statements in PutDatabaseRecord 05f3cad is described below commit 05f3cadee8ee76aa05a1d35ffb4598ddeca4f549 Author: Matthew Burgess <mattyb...@apache.org> AuthorDate: Mon Jun 10 12:07:44 2019 -0400 NIFI-6370: Allow multiple SQL statements in PutDatabaseRecord This closes #3528. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../processors/standard/PutDatabaseRecord.java | 34 +++++++++- .../standard/TestPutDatabaseRecord.groovy | 77 +++++++++++++++++++++- 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index d79cf3c..3046cbf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -239,6 +239,17 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + static final PropertyDescriptor ALLOW_MULTIPLE_STATEMENTS = new PropertyDescriptor.Builder() + .name("put-db-record-allow-multiple-statements") + .displayName("Allow Multiple SQL Statements") + .description("If the Statement Type is 'SQL' (as set in the statement.type attribute), this field indicates whether to split the field value by a semicolon and execute each statement " + + "separately. If any statement causes an error, the entire set of statements will be rolled back. If the Statement Type is not 'SQL', this field is ignored.") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + static final PropertyDescriptor QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder() .name("put-db-record-quoted-identifiers") .displayName("Quote Column Identifiers") @@ -309,6 +320,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { pds.add(UNMATCHED_COLUMN_BEHAVIOR); pds.add(UPDATE_KEYS); pds.add(FIELD_CONTAINING_SQL); + pds.add(ALLOW_MULTIPLE_STATEMENTS); pds.add(QUOTED_IDENTIFIERS); pds.add(QUOTED_TABLE_IDENTIFIER); pds.add(QUERY_TIMEOUT); @@ -404,7 +416,15 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { getLogger().warn("Failed to process {} due to {}", new Object[]{inputFlowFile, e}, e); - if (e instanceof BatchUpdateException) { + // Check if there was a BatchUpdateException or if multiple SQL statements were being executed and one failed + final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue(); + String statementType = statementTypeProperty; + if (USE_ATTR_TYPE.equals(statementTypeProperty)) { + statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE); + } + + if (e instanceof BatchUpdateException + || (SQL_TYPE.equalsIgnoreCase(statementType) && context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean())) { try { // Although process session will move forward in order to route the failed FlowFile, // database transaction should be rolled back to avoid partial batch update. @@ -567,8 +587,16 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { throw new MalformedRecordException(format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile)); } - // Execute the statement as-is - s.execute((String) sql); + // Execute the statement(s) as-is + if (context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) { + String regex = "(?<!\\\\);"; + String[] sqlStatements = ((String) sql).split(regex); + for (String statement : sqlStatements) { + s.execute(statement); + } + } else { + s.execute((String) sql); + } } result.routeTo(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index 0ddac80..f25bbb2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -53,7 +53,6 @@ import static org.junit.Assert.assertTrue import static org.junit.Assert.fail import static org.mockito.Matchers.anyMap import static org.mockito.Mockito.doAnswer -import static org.mockito.Mockito.only import static org.mockito.Mockito.spy import static org.mockito.Mockito.times import static org.mockito.Mockito.verify @@ -414,6 +413,82 @@ class TestPutDatabaseRecord { } @Test + void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("sql", RecordFieldType.STRING) + + parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101);INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)''') + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql') + runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 'true') + + def attrs = [:] + attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql' + runner.enqueue(new byte[0], attrs) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + final Connection conn = dbcp.getConnection() + final Statement stmt = conn.createStatement() + final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') + assertTrue(rs.next()) + assertEquals(1, rs.getInt(1)) + assertEquals('rec1', rs.getString(2)) + assertEquals(101, rs.getInt(3)) + assertTrue(rs.next()) + assertEquals(2, rs.getInt(1)) + assertEquals('rec2', rs.getString(2)) + assertEquals(102, rs.getInt(3)) + assertFalse(rs.next()) + + stmt.close() + conn.close() + } + + @Test + void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("sql", RecordFieldType.STRING) + + parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101); + INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102); + INSERT INTO PERSONS2 (id, name, code) VALUES (2, 'rec2',102);''') + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql') + runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 'true') + + def attrs = [:] + attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql' + runner.enqueue(new byte[0], attrs) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0) + runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1) + final Connection conn = dbcp.getConnection() + final Statement stmt = conn.createStatement() + final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS') + // The first two legitimate statements should have been rolled back + assertFalse(rs.next()) + + stmt.close() + conn.close() + } + + @Test void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException, IOException { recreateTable("PERSONS", createPersons) final MockRecordParser parser = new MockRecordParser()