This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 165a174c0a NIFI-13103: Make AutoCommit default to no value set in 
PutDatabaseRecord
165a174c0a is described below

commit 165a174c0a4bdb889907d22804fd8b6e42ecdf9e
Author: Joe Gresock <jgres...@gmail.com>
AuthorDate: Wed May 1 17:20:18 2024 -0400

    NIFI-13103: Make AutoCommit default to no value set in PutDatabaseRecord
    
    Signed-off-by: Joe Gresock <jgres...@gmail.com>
    This closes #8723
---
 .../processors/standard/PutDatabaseRecord.java     | 86 ++++++++------------
 .../processors/standard/PutDatabaseRecordTest.java | 94 +++++++++++++---------
 2 files changed, 92 insertions(+), 88 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index c81030d74c..c279569916 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -28,7 +28,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
@@ -74,7 +73,6 @@ import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.SQLTransientException;
 import java.sql.Statement;
@@ -86,7 +84,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -368,8 +365,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
     static final PropertyDescriptor MAX_BATCH_SIZE = new Builder()
             .name("put-db-record-max-batch-size")
             .displayName("Maximum Batch Size")
-            .description("Specifies maximum number of statements to be 
included in each batch. Zero means the batch size is not limited, "
-                    + "which can cause memory usage issues for a large number 
of statements.")
+            .description("Specifies maximum number of sql statements to be 
included in each batch sent to the database. Zero means the batch size is not 
limited, "
+                    + "and all statements are put into a single batch which 
can cause high memory usage issues for a very large number of statements.")
             .defaultValue("1000")
             .required(false)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
@@ -380,11 +377,11 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .name("database-session-autocommit")
             .displayName("Database Session AutoCommit")
             .description("The autocommit mode to set on the database 
connection being used. If set to false, the operation(s) will be explicitly 
committed or rolled back "
-                    + "(based on success or failure respectively). If set to 
true, the driver/database automatically handles the commit/rollback.")
+                    + "(based on success or failure respectively). If set to 
true, the driver/database automatically handles the commit/rollback. "
+                    + "Setting this property to 'No value' will leave the 
database connection's autocommit mode unmodified.")
             .allowableValues("true", "false")
             .defaultValue("false")
             .required(false)
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
     static final PropertyDescriptor DB_TYPE;
@@ -491,21 +488,22 @@ public class PutDatabaseRecord extends AbstractProcessor {
             );
         }
 
-        final boolean autoCommit = 
validationContext.getProperty(AUTO_COMMIT).asBoolean();
+        final Boolean autoCommit = 
validationContext.getProperty(AUTO_COMMIT).asBoolean();
         final boolean rollbackOnFailure = 
validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
-        if (autoCommit && rollbackOnFailure) {
+        if (autoCommit != null && autoCommit && rollbackOnFailure) {
             validationResults.add(new ValidationResult.Builder()
                     
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
                     .explanation(format("'%s' cannot be set to 'true' when 
'%s' is also set to 'true'. "
-                                    + "Transaction rollbacks for batch updates 
cannot be supported when auto commit is set to 'true'",
+                                    + "Transaction rollbacks for batch updates 
cannot rollback all the flow file's statements together "
+                                    + "when auto commit is set to 'true' 
because the database autocommits each batch separately.",
                             
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), 
AUTO_COMMIT.getDisplayName()))
                     .build());
         }
 
-        if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
+        if (autoCommit != null && autoCommit && 
!isMaxBatchSizeHardcodedToZero(validationContext)) {
                 final String explanation = format("'%s' must be hard-coded to 
zero when '%s' is set to 'true'."
                                 + " Batch size equal to zero executes all 
statements in a single transaction"
-                                + " which allows automatic rollback to revert 
all statements if an error occurs",
+                                + " which allows rollback to revert all the 
flow file's statements together if an error occurs.",
                         MAX_BATCH_SIZE.getDisplayName(), 
AUTO_COMMIT.getDisplayName());
 
                 validationResults.add(new ValidationResult.Builder()
@@ -547,11 +545,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
         dataRecordPath = dataRecordPathValue == null ? null : 
RecordPath.compile(dataRecordPathValue);
     }
 
-    @OnUnscheduled
-    public final void onUnscheduled() {
-        supportsBatchUpdates = Optional.empty();
-    }
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile flowFile = session.get();
@@ -567,18 +560,18 @@ public class PutDatabaseRecord extends AbstractProcessor {
             connection = dbcpService.getConnection(flowFile.getAttributes());
 
             originalAutoCommit = connection.getAutoCommit();
-            final boolean autoCommit = 
context.getProperty(AUTO_COMMIT).asBoolean();
-            if (originalAutoCommit != autoCommit) {
+            final Boolean propertyAutoCommitValue = 
context.getProperty(AUTO_COMMIT).asBoolean();
+            if (propertyAutoCommitValue != null && originalAutoCommit != 
propertyAutoCommitValue) {
                 try {
-                    connection.setAutoCommit(autoCommit);
-                } catch (SQLFeatureNotSupportedException sfnse) {
-                    getLogger().debug(String.format("setAutoCommit(%s) not 
supported by this driver", autoCommit), sfnse);
+                    connection.setAutoCommit(propertyAutoCommitValue);
+                } catch (Exception ex) {
+                    getLogger().debug("Failed to setAutoCommit({}) due to {}", 
propertyAutoCommitValue, ex.getClass().getName(), ex);
                 }
             }
 
             putToDatabase(context, session, flowFile, connection);
 
-            // Only commit the connection if auto-commit is false
+            // If the connection's auto-commit setting is false, then manually 
commit the transaction
             if (!connection.getAutoCommit()) {
                 connection.commit();
             }
@@ -605,12 +598,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
             relationship = REL_FAILURE;
         }
 
-        getLogger().error("Failed to put Records to database for {}. Routing 
to {}.", flowFile, relationship, e);
-
         final boolean rollbackOnFailure = 
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
         if (rollbackOnFailure) {
+            getLogger().error("Failed to put Records to database for {}. 
Rolling back NiFi session and returning the flow file to its incoming queue.", 
flowFile, e);
             session.rollback();
+            context.yield();
         } else {
+            getLogger().error("Failed to put Records to database for {}. 
Routing to {}.", flowFile, relationship, e);
             flowFile = session.putAttribute(flowFile, 
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": 
e.getMessage()));
             session.transfer(flowFile, relationship);
         }
@@ -623,6 +617,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
             try {
                 if (!connection.getAutoCommit()) {
                     connection.rollback();
+                    getLogger().debug("Manually rolled back JDBC 
transaction.");
                 }
             } catch (final Exception rollbackException) {
                 getLogger().error("Failed to rollback JDBC transaction", 
rollbackException);
@@ -680,9 +675,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
             final ComponentLog log = getLogger();
             final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
-            // Do not use batch if set to batch size of 1 because that is 
similar to not using batching.
+            // Batch Size 0 means put all sql statements into one batch update 
no matter how many statements there are.
+            // Do not use batch statements if batch size is equal to 1 because 
that is the same as not using batching.
             // Also do not use batches if the connection does not support 
batching.
-            boolean useBatch = maxBatchSize != 1 && 
isSupportBatchUpdates(connection);
+            boolean useBatch = maxBatchSize != 1 && 
isSupportsBatchUpdates(connection);
             int currentBatchSize = 0;
             int batchIndex = 0;
 
@@ -1002,13 +998,13 @@ public class PutDatabaseRecord extends AbstractProcessor 
{
                 try (InputStream inputStream = new 
ByteArrayInputStream(byteArray)) {
                     ps.setBlob(index, inputStream);
                 } catch (SQLException e) {
-                    throw new IOException("Unable to parse binary data " + 
value, e.getCause());
+                    throw new IOException("Unable to parse binary data " + 
value, e);
                 }
             } else {
                 try (InputStream inputStream = new 
ByteArrayInputStream(value.toString().getBytes(StandardCharsets.UTF_8))) {
                     ps.setBlob(index, inputStream);
                 } catch (IOException | SQLException e) {
-                    throw new IOException("Unable to parse binary data " + 
value, e.getCause());
+                    throw new IOException("Unable to parse binary data " + 
value, e);
                 }
             }
         } else if (sqlType == Types.CLOB) {
@@ -1024,7 +1020,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     clob.setString(1, value.toString());
                     ps.setClob(index, clob);
                 } catch (SQLException e) {
-                    throw new IOException("Unable to parse data as CLOB/String 
" + value, e.getCause());
+                    throw new IOException("Unable to parse data as CLOB/String 
" + value, e);
                 }
             }
         } else if (sqlType == Types.VARBINARY || sqlType == 
Types.LONGVARBINARY) {
@@ -1045,7 +1041,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 try {
                     ps.setBytes(index, byteArray);
                 } catch (SQLException e) {
-                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e.getCause());
+                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e);
                 }
             } else {
                 byte[] byteArray = new byte[0];
@@ -1053,7 +1049,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     byteArray = 
value.toString().getBytes(StandardCharsets.UTF_8);
                     ps.setBytes(index, byteArray);
                 } catch (SQLException e) {
-                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e.getCause());
+                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e);
                 }
             }
         } else {
@@ -1612,28 +1608,16 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
         return normalizedKeyColumnNames;
     }
 
-    private Optional<Boolean> supportsBatchUpdates = Optional.empty();
-
-    private void initializeSupportBatchUpdates(Connection connection) {
-        if (!supportsBatchUpdates.isPresent()) {
-            try {
-                final DatabaseMetaData dmd = connection.getMetaData();
-                supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates());
-                getLogger().debug(String.format("Connection 
supportsBatchUpdates is %s",
-                        supportsBatchUpdates.orElse(Boolean.FALSE)));
-            } catch (Exception ex) {
-                supportsBatchUpdates = Optional.of(Boolean.FALSE);
-                getLogger().debug(String.format("Exception while testing if 
connection supportsBatchUpdates due to %s - %s",
-                        ex.getClass().getName(), ex.getMessage()));
-            }
+    private boolean isSupportsBatchUpdates(Connection connection) {
+        try {
+            return connection.getMetaData().supportsBatchUpdates();
+        } catch (Exception ex) {
+            getLogger().debug(String.format("Exception while testing if 
connection supportsBatchUpdates due to %s - %s",
+                    ex.getClass().getName(), ex.getMessage()));
+            return false;
         }
     }
 
-    private boolean isSupportBatchUpdates(Connection connection) {
-        initializeSupportBatchUpdates(connection);
-        return supportsBatchUpdates.orElse(Boolean.FALSE);
-    }
-
     static class SchemaKey {
         private final String catalog;
         private final String schemaName;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 5c5cd2e8fa..733b109e80 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -80,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -94,11 +95,12 @@ public class PutDatabaseRecordTest {
         // DISABLED test cases are used for single-run tests which are not 
parameterized
         DEFAULT_0(ENABLED, new TestCase(false, false, 0)),
         DEFAULT_1(DISABLED, new TestCase(false, false, 1)),
-        DEFAULT_2(DISABLED, new TestCase(false, false, 2)),
+        DEFAULT_2(DISABLED, new TestCase(null, false, 2)),
+        DEFAULT_5(DISABLED, new TestCase(null, false, 5)),
         DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)),
 
         ROLLBACK_0(DISABLED, new TestCase(false, true, 0)),
-        ROLLBACK_1(DISABLED, new TestCase(false, true, 1)),
+        ROLLBACK_1(ENABLED, new TestCase(null, true, 1)),
         ROLLBACK_2(DISABLED, new TestCase(false, true, 2)),
         ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)),
 
@@ -199,7 +201,11 @@ public class PutDatabaseRecordTest {
         runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
         runner.enableControllerService(dbcp);
         runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
-        runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, 
testCase.getAutoCommitAsString());
+        if (testCase.getAutoCommitAsString() == null) {
+            runner.removeProperty(PutDatabaseRecord.AUTO_COMMIT);
+        } else {
+            runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, 
testCase.getAutoCommitAsString());
+        }
         runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 
testCase.getRollbackOnFailureAsString());
         runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, 
testCase.getBatchSizeAsString());
     }
@@ -714,11 +720,29 @@ public class PutDatabaseRecordTest {
         runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
         runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 
String.valueOf(allowMultipleStatements));
 
+        Supplier<Statement> spyStmt = createStatementSpy();
+
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
         runner.enqueue(new byte[0], attrs);
         runner.run();
 
+        final int maxBatchSize = 
runner.getProcessContext().getProperty(PutDatabaseRecord.MAX_BATCH_SIZE).asInteger();
+        assertNotNull(spyStmt.get());
+        if (sqlStatements.length <= 1) {
+            // When there is only 1 sql statement, then never use batching
+            verify(spyStmt.get(), times(0)).executeBatch();
+        } else if (maxBatchSize == 0) {
+            // When maxBatchSize is 0, verify that all statements were 
executed in a single executeBatch call
+            verify(spyStmt.get(), times(1)).executeBatch();
+        } else if (maxBatchSize == 1) {
+            // When maxBatchSize is 1, verify that executeBatch was never 
called
+            verify(spyStmt.get(), times(0)).executeBatch();
+        } else {
+            // When maxBatchSize > 1, verify that executeBatch was called at 
least once
+            verify(spyStmt.get(), atLeastOnce()).executeBatch();
+        }
+
         runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
@@ -1557,43 +1581,27 @@ public class PutDatabaseRecordTest {
     }
 
     @Test
-    public void testInsertWithMaxBatchSize() throws InitializationException, 
ProcessException, SQLException {
-        setRunner(TestCaseEnum.DEFAULT_0.getTestCase());
-
-        recreateTable(createPersons);
-        final MockRecordParser parser = new MockRecordParser();
-        runner.addControllerService("parser", parser);
-        runner.enableControllerService(parser);
-
-        parser.addSchemaField("id", RecordFieldType.INT);
-        parser.addSchemaField("name", RecordFieldType.STRING);
-        parser.addSchemaField("code", RecordFieldType.INT);
-
-        for (int i = 1; i < 12; i++) {
-            parser.addRecord(i, String.format("rec%s", i), 100 + i);
-        }
-
-        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
-        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
-        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
-        runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, "5");
-
-        Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy();
-
-        runner.enqueue(new byte[0]);
-        runner.run();
-
-        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
+    public void testInsertWithMaxBatchSize0() throws InitializationException, 
ProcessException, SQLException {
+        testInsertWithBatchSize(TestCaseEnum.DEFAULT_0.getTestCase(), 1);
+    }
 
-        assertEquals(11, getTableSize());
+    @Test
+    public void testInsertWithMaxBatchSize1() throws InitializationException, 
ProcessException, SQLException {
+        testInsertWithBatchSize(TestCaseEnum.DEFAULT_1.getTestCase(), 11);
+    }
 
-        assertNotNull(spyStmt.get());
-        verify(spyStmt.get(), times(3)).executeBatch();
+    @Test
+    public void testInsertWithMaxBatchSize5() throws InitializationException, 
ProcessException, SQLException {
+        testInsertWithBatchSize(TestCaseEnum.DEFAULT_5.getTestCase(), 3);
     }
 
     @Test
-    public void testInsertWithDefaultMaxBatchSize() throws 
InitializationException, ProcessException, SQLException {
-        setRunner(TestCaseEnum.DEFAULT_1000.getTestCase());
+    public void testInsertWithMaxBatchSize1000() throws 
InitializationException, ProcessException, SQLException {
+        testInsertWithBatchSize(TestCaseEnum.DEFAULT_1000.getTestCase(), 1);
+    }
+
+    public void testInsertWithBatchSize(TestCase testCase, int 
expectedBatchCount) throws InitializationException, ProcessException, 
SQLException {
+        setRunner(testCase);
 
         recreateTable(createPersons);
         final MockRecordParser parser = new MockRecordParser();
@@ -1611,7 +1619,6 @@ public class PutDatabaseRecordTest {
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE);
         runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
-        runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, 
PutDatabaseRecord.MAX_BATCH_SIZE.getDefaultValue());
 
         Supplier<PreparedStatement> spyStmt = createPreparedStatementSpy();
 
@@ -1623,7 +1630,7 @@ public class PutDatabaseRecordTest {
         assertEquals(11, getTableSize());
 
         assertNotNull(spyStmt.get());
-        verify(spyStmt.get(), times(1)).executeBatch();
+        verify(spyStmt.get(), times(expectedBatchCount)).executeBatch();
     }
 
     @Test
@@ -2322,6 +2329,19 @@ public class PutDatabaseRecordTest {
         return () -> spyStmt[0];
     }
 
+    private Supplier<Statement> createStatementSpy() {
+        final Statement[] spyStmt = new Statement[1];
+        final Answer<DelegatingConnection> answer = (inv) -> new 
DelegatingConnection((Connection) inv.callRealMethod()) {
+            @Override
+            public Statement createStatement() throws SQLException {
+                spyStmt[0] = spy(getDelegate().createStatement());
+                return spyStmt[0];
+            }
+        };
+        doAnswer(answer).when(dbcp).getConnection();
+        return () -> spyStmt[0];
+    }
+
     static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
         @Override
         SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String 
tableName, TableSchema tableSchema, DMLSettings settings) throws 
IllegalArgumentException {

Reply via email to