This is an automated email from the ASF dual-hosted git repository. jgresock pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 165a174c0a NIFI-13103: Make AutoCommit default to no value set in PutDatabaseRecord 165a174c0a is described below commit 165a174c0a4bdb889907d22804fd8b6e42ecdf9e Author: Joe Gresock <jgres...@gmail.com> AuthorDate: Wed May 1 17:20:18 2024 -0400 NIFI-13103: Make AutoCommit default to no value set in PutDatabaseRecord Signed-off-by: Joe Gresock <jgres...@gmail.com> This closes #8723 --- .../processors/standard/PutDatabaseRecord.java | 86 ++++++++------------ .../processors/standard/PutDatabaseRecordTest.java | 94 +++++++++++++--------- 2 files changed, 92 insertions(+), 88 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index c81030d74c..c279569916 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -28,7 +28,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; @@ -74,7 +73,6 @@ import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.SQLDataException; import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLTransientException; import java.sql.Statement; @@ -86,7 +84,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -368,8 +365,8 @@ public class PutDatabaseRecord extends AbstractProcessor { static final PropertyDescriptor MAX_BATCH_SIZE = new Builder() .name("put-db-record-max-batch-size") .displayName("Maximum Batch Size") - .description("Specifies maximum number of statements to be included in each batch. Zero means the batch size is not limited, " - + "which can cause memory usage issues for a large number of statements.") + .description("Specifies maximum number of sql statements to be included in each batch sent to the database. Zero means the batch size is not limited, " + + "and all statements are put into a single batch which can cause high memory usage issues for a very large number of statements.") .defaultValue("1000") .required(false) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) @@ -380,11 +377,11 @@ public class PutDatabaseRecord extends AbstractProcessor { .name("database-session-autocommit") .displayName("Database Session AutoCommit") .description("The autocommit mode to set on the database connection being used. If set to false, the operation(s) will be explicitly committed or rolled back " - + "(based on success or failure respectively). If set to true, the driver/database automatically handles the commit/rollback.") + + "(based on success or failure respectively). If set to true, the driver/database automatically handles the commit/rollback. " + + "Setting this property to 'No value' will leave the database connection's autocommit mode unmodified.") .allowableValues("true", "false") .defaultValue("false") .required(false) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); static final PropertyDescriptor DB_TYPE; @@ -491,21 +488,22 @@ public class PutDatabaseRecord extends AbstractProcessor { ); } - final boolean autoCommit = validationContext.getProperty(AUTO_COMMIT).asBoolean(); + final Boolean autoCommit = validationContext.getProperty(AUTO_COMMIT).asBoolean(); final boolean rollbackOnFailure = validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean(); - if (autoCommit && rollbackOnFailure) { + if (autoCommit != null && autoCommit && rollbackOnFailure) { validationResults.add(new ValidationResult.Builder() .subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName()) .explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'. " - + "Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'", + + "Transaction rollbacks for batch updates cannot rollback all the flow file's statements together " + + "when auto commit is set to 'true' because the database autocommits each batch separately.", RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName())) .build()); } - if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) { + if (autoCommit != null && autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) { final String explanation = format("'%s' must be hard-coded to zero when '%s' is set to 'true'." + " Batch size equal to zero executes all statements in a single transaction" - + " which allows automatic rollback to revert all statements if an error occurs", + + " which allows rollback to revert all the flow file's statements together if an error occurs.", MAX_BATCH_SIZE.getDisplayName(), AUTO_COMMIT.getDisplayName()); validationResults.add(new ValidationResult.Builder() @@ -547,11 +545,6 @@ public class PutDatabaseRecord extends AbstractProcessor { dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue); } - @OnUnscheduled - public final void onUnscheduled() { - supportsBatchUpdates = Optional.empty(); - } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -567,18 +560,18 @@ public class PutDatabaseRecord extends AbstractProcessor { connection = dbcpService.getConnection(flowFile.getAttributes()); originalAutoCommit = connection.getAutoCommit(); - final boolean autoCommit = context.getProperty(AUTO_COMMIT).asBoolean(); - if (originalAutoCommit != autoCommit) { + final Boolean propertyAutoCommitValue = context.getProperty(AUTO_COMMIT).asBoolean(); + if (propertyAutoCommitValue != null && originalAutoCommit != propertyAutoCommitValue) { try { - connection.setAutoCommit(autoCommit); - } catch (SQLFeatureNotSupportedException sfnse) { - getLogger().debug(String.format("setAutoCommit(%s) not supported by this driver", autoCommit), sfnse); + connection.setAutoCommit(propertyAutoCommitValue); + } catch (Exception ex) { + getLogger().debug("Failed to setAutoCommit({}) due to {}", propertyAutoCommitValue, ex.getClass().getName(), ex); } } putToDatabase(context, session, flowFile, connection); - // Only commit the connection if auto-commit is false + // If the connection's auto-commit setting is false, then manually commit the transaction if (!connection.getAutoCommit()) { connection.commit(); } @@ -605,12 +598,13 @@ public class PutDatabaseRecord extends AbstractProcessor { relationship = REL_FAILURE; } - getLogger().error("Failed to put Records to database for {}. Routing to {}.", flowFile, relationship, e); - final boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean(); if (rollbackOnFailure) { + getLogger().error("Failed to put Records to database for {}. Rolling back NiFi session and returning the flow file to its incoming queue.", flowFile, e); session.rollback(); + context.yield(); } else { + getLogger().error("Failed to put Records to database for {}. Routing to {}.", flowFile, relationship, e); flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": e.getMessage())); session.transfer(flowFile, relationship); } @@ -623,6 +617,7 @@ public class PutDatabaseRecord extends AbstractProcessor { try { if (!connection.getAutoCommit()) { connection.rollback(); + getLogger().debug("Manually rolled back JDBC transaction."); } } catch (final Exception rollbackException) { getLogger().error("Failed to rollback JDBC transaction", rollbackException); @@ -680,9 +675,10 @@ public class PutDatabaseRecord extends AbstractProcessor { final ComponentLog log = getLogger(); final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); - // Do not use batch if set to batch size of 1 because that is similar to not using batching. + // Batch Size 0 means put all sql statements into one batch update no matter how many statements there are. + // Do not use batch statements if batch size is equal to 1 because that is the same as not using batching. // Also do not use batches if the connection does not support batching. - boolean useBatch = maxBatchSize != 1 && isSupportBatchUpdates(connection); + boolean useBatch = maxBatchSize != 1 && isSupportsBatchUpdates(connection); int currentBatchSize = 0; int batchIndex = 0; @@ -1002,13 +998,13 @@ public class PutDatabaseRecord extends AbstractProcessor { try (InputStream inputStream = new ByteArrayInputStream(byteArray)) { ps.setBlob(index, inputStream); } catch (SQLException e) { - throw new IOException("Unable to parse binary data " + value, e.getCause()); + throw new IOException("Unable to parse binary data " + value, e); } } else { try (InputStream inputStream = new ByteArrayInputStream(value.toString().getBytes(StandardCharsets.UTF_8))) { ps.setBlob(index, inputStream); } catch (IOException | SQLException e) { - throw new IOException("Unable to parse binary data " + value, e.getCause()); + throw new IOException("Unable to parse binary data " + value, e); } } } else if (sqlType == Types.CLOB) { @@ -1024,7 +1020,7 @@ public class PutDatabaseRecord extends AbstractProcessor { clob.setString(1, value.toString()); ps.setClob(index, clob); } catch (SQLException e) { - throw new IOException("Unable to parse data as CLOB/String " + value, e.getCause()); + throw new IOException("Unable to parse data as CLOB/String " + value, e); } } } else if (sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) { @@ -1045,7 +1041,7 @@ public class PutDatabaseRecord extends AbstractProcessor { try { ps.setBytes(index, byteArray); } catch (SQLException e) { - throw new IOException("Unable to parse binary data with size" + byteArray.length, e.getCause()); + throw new IOException("Unable to parse binary data with size" + byteArray.length, e); } } else { byte[] byteArray = new byte[0]; @@ -1053,7 +1049,7 @@ public class PutDatabaseRecord extends AbstractProcessor { byteArray = value.toString().getBytes(StandardCharsets.UTF_8); ps.setBytes(index, byteArray); } catch (SQLException e) { - throw new IOException("Unable to parse binary data with size" + byteArray.length, e.getCause()); + throw new IOException("Unable to parse binary data with size" + byteArray.length, e); } } } else { @@ -1612,28 +1608,16 @@ public class PutDatabaseRecord extends AbstractProcessor { return normalizedKeyColumnNames; } - private Optional<Boolean> supportsBatchUpdates = Optional.empty(); - - private void initializeSupportBatchUpdates(Connection connection) { - if (!supportsBatchUpdates.isPresent()) { - try { - final DatabaseMetaData dmd = connection.getMetaData(); - supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates()); - getLogger().debug(String.format("Connection supportsBatchUpdates is %s", - supportsBatchUpdates.orElse(Boolean.FALSE))); - } catch (Exception ex) { - supportsBatchUpdates = Optional.of(Boolean.FALSE); - getLogger().debug(String.format("Exception while testing if connection supportsBatchUpdates due to %s - %s", - ex.getClass().getName(), ex.getMessage())); - } + private boolean isSupportsBatchUpdates(Connection connection) { + try { + return connection.getMetaData().supportsBatchUpdates(); + } catch (Exception ex) { + getLogger().debug(String.format("Exception while testing if connection supportsBatchUpdates due to %s - %s", + ex.getClass().getName(), ex.getMessage())); + return false; } } - private boolean isSupportBatchUpdates(Connection connection) { - initializeSupportBatchUpdates(connection); - return supportsBatchUpdates.orElse(Boolean.FALSE); - } - static class SchemaKey { private final String catalog; private final String schemaName; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index 5c5cd2e8fa..733b109e80 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -80,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; @@ -94,11 +95,12 @@ public class PutDatabaseRecordTest { // DISABLED test cases are used for single-run tests which are not parameterized DEFAULT_0(ENABLED, new TestCase(false, false, 0)), DEFAULT_1(DISABLED, new TestCase(false, false, 1)), - DEFAULT_2(DISABLED, new TestCase(false, false, 2)), + DEFAULT_2(DISABLED, new TestCase(null, false, 2)), + DEFAULT_5(DISABLED, new TestCase(null, false, 5)), DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)), ROLLBACK_0(DISABLED, new TestCase(false, true, 0)), - ROLLBACK_1(DISABLED, new TestCase(false, true, 1)), + ROLLBACK_1(ENABLED, new TestCase(null, true, 1)), ROLLBACK_2(DISABLED, new TestCase(false, true, 2)), ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)), @@ -199,7 +201,11 @@ public class PutDatabaseRecordTest { runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties); runner.enableControllerService(dbcp); runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); - runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, testCase.getAutoCommitAsString()); + if (testCase.getAutoCommitAsString() == null) { + runner.removeProperty(PutDatabaseRecord.AUTO_COMMIT); + } else { + runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, testCase.getAutoCommitAsString()); + } runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, testCase.getRollbackOnFailureAsString()); runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, testCase.getBatchSizeAsString()); } @@ -714,11 +720,29 @@ public class PutDatabaseRecordTest { runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql"); runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, String.valueOf(allowMultipleStatements)); + Supplier<Statement> spyStmt = createStatementSpy(); + final Map<String, String> attrs = new HashMap<>(); attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql"); runner.enqueue(new byte[0], attrs); runner.run(); + final int maxBatchSize = runner.getProcessContext().getProperty(PutDatabaseRecord.MAX_BATCH_SIZE).asInteger(); + assertNotNull(spyStmt.get()); + if (sqlStatements.length <= 1) { + // When there is only 1 sql statement, then never use batching + verify(spyStmt.get(), times(0)).executeBatch(); + } else if (maxBatchSize == 0) { + // When maxBatchSize is 0, verify that all statements were executed in a single executeBatch call + verify(spyStmt.get(), times(1)).executeBatch(); + } else if (maxBatchSize == 1) { + // When maxBatchSize is 1, verify that executeBatch was never called + verify(spyStmt.get(), times(0)).executeBatch(); + } else { + // When maxBatchSize > 1, verify that executeBatch was called at least once + verify(spyStmt.get(), atLeastOnce()).executeBatch(); + } + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement(); @@ -1557,43 +1581,27 @@ public class PutDatabaseRecordTest { } @Test - public void testInsertWithMaxBatchSize() throws InitializationException, ProcessException, SQLException { - setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); - - recreateTable(createPersons); - final MockRecordParser parser = new MockRecordParser(); - runner.addControllerService("parser", parser); - runner.enableControllerService(parser); - - parser.addSchemaField("id", RecordFieldType.INT); - parser.addSchemaField("name", RecordFieldType.STRING); - parser.addSchemaField("code", RecordFieldType.INT); - - for (int i = 1; i < 12; i++) { - parser.addRecord(i, String.format("rec%s", i), 100 + i); - } - - runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); - runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); - runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); - runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, "5"); - - Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy(); - - runner.enqueue(new byte[0]); - runner.run(); - - runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + public void testInsertWithMaxBatchSize0() throws InitializationException, ProcessException, SQLException { + testInsertWithBatchSize(TestCaseEnum.DEFAULT_0.getTestCase(), 1); + } - assertEquals(11, getTableSize()); + @Test + public void testInsertWithMaxBatchSize1() throws InitializationException, ProcessException, SQLException { + testInsertWithBatchSize(TestCaseEnum.DEFAULT_1.getTestCase(), 11); + } - assertNotNull(spyStmt.get()); - verify(spyStmt.get(), times(3)).executeBatch(); + @Test + public void testInsertWithMaxBatchSize5() throws InitializationException, ProcessException, SQLException { + testInsertWithBatchSize(TestCaseEnum.DEFAULT_5.getTestCase(), 3); } @Test - public void testInsertWithDefaultMaxBatchSize() throws InitializationException, ProcessException, SQLException { - setRunner(TestCaseEnum.DEFAULT_1000.getTestCase()); + public void testInsertWithMaxBatchSize1000() throws InitializationException, ProcessException, SQLException { + testInsertWithBatchSize(TestCaseEnum.DEFAULT_1000.getTestCase(), 1); + } + + public void testInsertWithBatchSize(TestCase testCase, int expectedBatchCount) throws InitializationException, ProcessException, SQLException { + setRunner(testCase); recreateTable(createPersons); final MockRecordParser parser = new MockRecordParser(); @@ -1611,7 +1619,6 @@ public class PutDatabaseRecordTest { runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); - runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, PutDatabaseRecord.MAX_BATCH_SIZE.getDefaultValue()); Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy(); @@ -1623,7 +1630,7 @@ public class PutDatabaseRecordTest { assertEquals(11, getTableSize()); assertNotNull(spyStmt.get()); - verify(spyStmt.get(), times(1)).executeBatch(); + verify(spyStmt.get(), times(expectedBatchCount)).executeBatch(); } @Test @@ -2322,6 +2329,19 @@ public class PutDatabaseRecordTest { return () -> spyStmt[0]; } + private Supplier<Statement> createStatementSpy() { + final Statement[] spyStmt = new Statement[1]; + final Answer<DelegatingConnection> answer = (inv) -> new DelegatingConnection((Connection) inv.callRealMethod()) { + @Override + public Statement createStatement() throws SQLException { + spyStmt[0] = spy(getDelegate().createStatement()); + return spyStmt[0]; + } + }; + doAnswer(answer).when(dbcp).getConnection(); + return () -> spyStmt[0]; + } + static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord { @Override SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException {