Repository: nifi Updated Branches: refs/heads/master 13011ac6d -> d319a3ef2
NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor Renamed 'batch size' to 'Maximum Batch Size'. Changed default value of max_batch_size to zero (INFINITE) Fixed parameter validation. Added unit tests Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3128 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d319a3ef Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d319a3ef Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d319a3ef Branch: refs/heads/master Commit: d319a3ef2f14317f29a1be5a189bc34f8b3fdbd6 Parents: 13011ac Author: vadimar <varshav...@ebay.com> Authored: Mon Nov 5 13:15:12 2018 +0200 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Thu Nov 15 10:31:34 2018 -0500 ---------------------------------------------------------------------- .../processors/standard/PutDatabaseRecord.java | 29 +++++- .../standard/TestPutDatabaseRecord.groovy | 103 +++++++++++++++++++ 2 files changed, 130 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d319a3ef/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 2f2d901..d79cf3c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -275,6 +275,17 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { .required(true) .build(); + static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("put-db-record-max-batch-size") + .displayName("Maximum Batch Size") + .description("Specifies maximum batch size for INSERT and UPDATE statements. This parameter has no effect for other statements specified in 'Statement Type'." + + " Zero means the batch size is not limited.") + .defaultValue("0") + .required(false) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + protected static List<PropertyDescriptor> propDescriptors; private Cache<SchemaKey, TableSchema> schemaCache; @@ -303,6 +314,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { pds.add(QUERY_TIMEOUT); pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); pds.add(TABLE_SCHEMA_CACHE_SIZE); + pds.add(MAX_BATCH_SIZE); propDescriptors = Collections.unmodifiableList(pds); } @@ -641,6 +653,10 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { Record currentRecord; List<Integer> fieldIndexes = sqlHolder.getFieldIndexes(); + final Integer maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); + int currentBatchSize = 0; + int batchIndex = 0; + while ((currentRecord = recordParser.nextRecord()) != null) { Object[] values = currentRecord.getValues(); if (values != null) { @@ -667,11 +683,20 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { } } ps.addBatch(); + if (++currentBatchSize == maxBatchSize) { + batchIndex++; + log.debug("Executing query {}; fieldIndexes: {}; batch index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize}); + ps.executeBatch(); + currentBatchSize = 0; + } } } - log.debug("Executing query {}", new Object[]{sqlHolder}); - ps.executeBatch(); + if (currentBatchSize > 0) { + batchIndex++; + log.debug("Executing query {}; fieldIndexes: {}; batch index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize}); + ps.executeBatch(); + } result.routeTo(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl); http://git-wip-us.apache.org/repos/asf/nifi/blob/d319a3ef/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy index ebf8460..0ddac80 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.standard + +import org.apache.commons.dbcp2.DelegatingConnection import org.apache.nifi.processor.exception.ProcessException import org.apache.nifi.processor.util.pattern.RollbackOnFailure import org.apache.nifi.reporting.InitializationException @@ -36,17 +38,25 @@ import org.junit.runners.JUnit4 import java.sql.Connection import java.sql.DriverManager +import java.sql.PreparedStatement import java.sql.ResultSet import java.sql.SQLDataException import java.sql.SQLException import java.sql.SQLNonTransientConnectionException import java.sql.Statement +import java.util.function.Supplier import static org.junit.Assert.assertEquals import static org.junit.Assert.assertFalse +import static org.junit.Assert.assertNotNull import static org.junit.Assert.assertTrue import static org.junit.Assert.fail +import static org.mockito.Matchers.anyMap +import static org.mockito.Mockito.doAnswer +import static org.mockito.Mockito.only import static org.mockito.Mockito.spy +import static org.mockito.Mockito.times +import static org.mockito.Mockito.verify /** * Unit tests for the PutDatabaseRecord processor @@ -718,6 +728,99 @@ class TestPutDatabaseRecord { conn.close() } + @Test + void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + + (1..11).each { + parser.addRecord(it, "rec$it".toString(), 100 + it) + } + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, "5") + + Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy() + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + + assertEquals(11, getTableSize()) + + assertNotNull(spyStmt.get()) + verify(spyStmt.get(), times(3)).executeBatch() + } + + @Test + void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException, IOException { + recreateTable("PERSONS", createPersons) + final MockRecordParser parser = new MockRecordParser() + runner.addControllerService("parser", parser) + runner.enableControllerService(parser) + + parser.addSchemaField("id", RecordFieldType.INT) + parser.addSchemaField("name", RecordFieldType.STRING) + parser.addSchemaField("code", RecordFieldType.INT) + + (1..11).each { + parser.addRecord(it, "rec$it".toString(), 100 + it) + } + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) + runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS') + + Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy() + + runner.enqueue(new byte[0]) + runner.run() + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1) + + assertEquals(11, getTableSize()) + + assertNotNull(spyStmt.get()) + verify(spyStmt.get(), times(1)).executeBatch() + } + + private Supplier<PreparedStatement> createPreparedStatementSpy() { + PreparedStatement spyStmt + doAnswer({ inv -> + new DelegatingConnection((Connection)inv.callRealMethod()) { + @Override + PreparedStatement prepareStatement(String sql) throws SQLException { + spyStmt = spy(getDelegate().prepareStatement(sql)) + } + } + }).when(dbcp).getConnection(anyMap()) + return { spyStmt } + } + + private int getTableSize() { + final Connection connection = dbcp.getConnection() + try { + final Statement stmt = connection.createStatement() + try { + final ResultSet rs = stmt.executeQuery('SELECT count(*) FROM PERSONS') + assertTrue(rs.next()) + rs.getInt(1) + } finally { + stmt.close() + } + } finally { + connection.close() + } + } private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException { final Connection conn = dbcp.getConnection()