This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c06340c5a NIFI-13103 Enhance AutoCommit property  in PutDatabaseRecord
0c06340c5a is described below

commit 0c06340c5a02eb369571bcd59512e6f0890a0da8
Author: Jim Steinebrey <jrsteineb...@gmail.com>
AuthorDate: Thu Jun 6 18:12:52 2024 -0400

    NIFI-13103 Enhance AutoCommit property  in PutDatabaseRecord
    
    Signed-off-by: Matt Burgess <mattyb...@apache.org>
    
    This closes #8937
---
 .../processors/standard/PutDatabaseRecord.java     | 79 +++++++++-------------
 .../processors/standard/PutDatabaseRecordTest.java | 45 ++++++++++--
 2 files changed, 72 insertions(+), 52 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 3210c609a4..566794ad34 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -27,7 +27,6 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.documentation.UseCase;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
@@ -72,7 +71,6 @@ import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.SQLTransientException;
 import java.sql.Statement;
@@ -86,7 +84,6 @@ import java.util.HashSet;
 import java.util.HexFormat;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -385,7 +382,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
             .allowableValues("true", "false")
             .defaultValue("false")
             .required(false)
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
     static final PropertyDescriptor DB_TYPE;
@@ -479,21 +475,22 @@ public class PutDatabaseRecord extends AbstractProcessor {
             );
         }
 
-        final boolean autoCommit = 
validationContext.getProperty(AUTO_COMMIT).asBoolean();
+        final Boolean autoCommit = 
validationContext.getProperty(AUTO_COMMIT).asBoolean();
         final boolean rollbackOnFailure = 
validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
-        if (autoCommit && rollbackOnFailure) {
+        if (autoCommit != null && autoCommit && rollbackOnFailure) {
             validationResults.add(new ValidationResult.Builder()
                     
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
                     .explanation(format("'%s' cannot be set to 'true' when 
'%s' is also set to 'true'. "
-                                    + "Transaction rollbacks for batch updates 
cannot be supported when auto commit is set to 'true'",
+                                    + "Transaction rollbacks for batch updates 
cannot rollback all the flow file's statements together "
+                                    + "when auto commit is set to 'true' 
because the database autocommits each batch separately.",
                             
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), 
AUTO_COMMIT.getDisplayName()))
                     .build());
         }
 
-        if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
+        if (autoCommit != null && autoCommit && 
!isMaxBatchSizeHardcodedToZero(validationContext)) {
                 final String explanation = format("'%s' must be hard-coded to 
zero when '%s' is set to 'true'."
                                 + " Batch size equal to zero executes all 
statements in a single transaction"
-                                + " which allows automatic rollback to revert 
all statements if an error occurs",
+                                + " which allows rollback to revert all the 
flow file's statements together if an error occurs.",
                         MAX_BATCH_SIZE.getDisplayName(), 
AUTO_COMMIT.getDisplayName());
 
                 validationResults.add(new ValidationResult.Builder()
@@ -535,11 +532,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
         dataRecordPath = dataRecordPathValue == null ? null : 
RecordPath.compile(dataRecordPathValue);
     }
 
-    @OnUnscheduled
-    public final void onUnscheduled() {
-        supportsBatchUpdates = Optional.empty();
-    }
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile flowFile = session.get();
@@ -555,18 +547,18 @@ public class PutDatabaseRecord extends AbstractProcessor {
             connection = dbcpService.getConnection(flowFile.getAttributes());
 
             originalAutoCommit = connection.getAutoCommit();
-            final boolean autoCommit = 
context.getProperty(AUTO_COMMIT).asBoolean();
-            if (originalAutoCommit != autoCommit) {
+            final Boolean propertyAutoCommitValue = 
context.getProperty(AUTO_COMMIT).asBoolean();
+            if (propertyAutoCommitValue != null && originalAutoCommit != 
propertyAutoCommitValue) {
                 try {
-                    connection.setAutoCommit(autoCommit);
-                } catch (SQLFeatureNotSupportedException sfnse) {
-                    getLogger().debug(String.format("setAutoCommit(%s) not 
supported by this driver", autoCommit), sfnse);
+                    connection.setAutoCommit(propertyAutoCommitValue);
+                } catch (Exception ex) {
+                    getLogger().debug("Failed to setAutoCommit({}) due to {}", 
propertyAutoCommitValue, ex.getClass().getName(), ex);
                 }
             }
 
             putToDatabase(context, session, flowFile, connection);
 
-            // Only commit the connection if auto-commit is false
+            // If the connection's auto-commit setting is false, then manually 
commit the transaction
             if (!connection.getAutoCommit()) {
                 connection.commit();
             }
@@ -593,12 +585,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
             relationship = REL_FAILURE;
         }
 
-        getLogger().error("Failed to put Records to database for {}. Routing 
to {}.", flowFile, relationship, e);
-
         final boolean rollbackOnFailure = 
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
         if (rollbackOnFailure) {
+            getLogger().error("Failed to put Records to database for {}. 
Rolling back NiFi session and returning the flow file to its incoming queue.", 
flowFile, e);
             session.rollback();
+            context.yield();
         } else {
+            getLogger().error("Failed to put Records to database for {}. 
Routing to {}.", flowFile, relationship, e);
             flowFile = session.putAttribute(flowFile, 
PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown" : 
e.getMessage()));
             session.transfer(flowFile, relationship);
         }
@@ -611,6 +604,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
             try {
                 if (!connection.getAutoCommit()) {
                     connection.rollback();
+                    getLogger().debug("Manually rolled back JDBC 
transaction.");
                 }
             } catch (final Exception rollbackException) {
                 getLogger().error("Failed to rollback JDBC transaction", 
rollbackException);
@@ -668,9 +662,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
             final ComponentLog log = getLogger();
             final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
-            // Do not use batch if set to batch size of 1 because that is 
similar to not using batching.
+            // Batch Size 0 means put all sql statements into one batch update 
no matter how many statements there are.
+            // Do not use batch statements if batch size is equal to 1 because 
that is the same as not using batching.
             // Also do not use batches if the connection does not support 
batching.
-            boolean useBatch = maxBatchSize != 1 && 
isSupportBatchUpdates(connection);
+            boolean useBatch = maxBatchSize != 1 && 
isSupportsBatchUpdates(connection);
             int currentBatchSize = 0;
             int batchIndex = 0;
 
@@ -990,13 +985,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 try (InputStream inputStream = new 
ByteArrayInputStream(byteArray)) {
                     ps.setBlob(index, inputStream);
                 } catch (SQLException e) {
-                    throw new IOException("Unable to parse binary data " + 
value, e.getCause());
+                    throw new IOException("Unable to parse binary data " + 
value, e);
                 }
             } else {
                 try (InputStream inputStream = new 
ByteArrayInputStream(value.toString().getBytes(StandardCharsets.UTF_8))) {
                     ps.setBlob(index, inputStream);
                 } catch (IOException | SQLException e) {
-                    throw new IOException("Unable to parse binary data " + 
value, e.getCause());
+                    throw new IOException("Unable to parse binary data " + 
value, e);
                 }
             }
         } else if (sqlType == Types.CLOB) {
@@ -1012,7 +1007,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     clob.setString(1, value.toString());
                     ps.setClob(index, clob);
                 } catch (SQLException e) {
-                    throw new IOException("Unable to parse data as CLOB/String 
" + value, e.getCause());
+                    throw new IOException("Unable to parse data as CLOB/String 
" + value, e);
                 }
             }
         } else if (sqlType == Types.VARBINARY || sqlType == 
Types.LONGVARBINARY) {
@@ -1033,7 +1028,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 try {
                     ps.setBytes(index, byteArray);
                 } catch (SQLException e) {
-                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e.getCause());
+                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e);
                 }
             } else {
                 byte[] byteArray = new byte[0];
@@ -1041,7 +1036,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     byteArray = 
value.toString().getBytes(StandardCharsets.UTF_8);
                     ps.setBytes(index, byteArray);
                 } catch (SQLException e) {
-                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e.getCause());
+                    throw new IOException("Unable to parse binary data with 
size" + byteArray.length, e);
                 }
             }
         } else {
@@ -1600,28 +1595,16 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
         return normalizedKeyColumnNames;
     }
 
-    private Optional<Boolean> supportsBatchUpdates = Optional.empty();
-
-    private void initializeSupportBatchUpdates(Connection connection) {
-        if (!supportsBatchUpdates.isPresent()) {
-            try {
-                final DatabaseMetaData dmd = connection.getMetaData();
-                supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates());
-                getLogger().debug(String.format("Connection 
supportsBatchUpdates is %s",
-                        supportsBatchUpdates.orElse(Boolean.FALSE)));
-            } catch (Exception ex) {
-                supportsBatchUpdates = Optional.of(Boolean.FALSE);
-                getLogger().debug(String.format("Exception while testing if 
connection supportsBatchUpdates due to %s - %s",
-                        ex.getClass().getName(), ex.getMessage()));
-            }
+    private boolean isSupportsBatchUpdates(Connection connection) {
+        try {
+            return connection.getMetaData().supportsBatchUpdates();
+        } catch (Exception ex) {
+            getLogger().debug(String.format("Exception while testing if 
connection supportsBatchUpdates due to %s - %s",
+                    ex.getClass().getName(), ex.getMessage()));
+            return false;
         }
     }
 
-    private boolean isSupportBatchUpdates(Connection connection) {
-        initializeSupportBatchUpdates(connection);
-        return supportsBatchUpdates.orElse(Boolean.FALSE);
-    }
-
     static class SchemaKey {
         private final String catalog;
         private final String schemaName;
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index dd004e7db7..b03197a731 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -81,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
@@ -95,11 +96,12 @@ public class PutDatabaseRecordTest {
         // DISABLED test cases are used for single-run tests which are not 
parameterized
         DEFAULT_0(ENABLED, new TestCase(false, false, 0)),
         DEFAULT_1(DISABLED, new TestCase(false, false, 1)),
-        DEFAULT_2(DISABLED, new TestCase(false, false, 2)),
+        DEFAULT_2(DISABLED, new TestCase(null, false, 2)),
+        DEFAULT_5(DISABLED, new TestCase(null, false, 5)),
         DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)),
 
         ROLLBACK_0(DISABLED, new TestCase(false, true, 0)),
-        ROLLBACK_1(DISABLED, new TestCase(false, true, 1)),
+        ROLLBACK_1(ENABLED, new TestCase(null, true, 1)),
         ROLLBACK_2(DISABLED, new TestCase(false, true, 2)),
         ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)),
 
@@ -200,7 +202,11 @@ public class PutDatabaseRecordTest {
         runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
         runner.enableControllerService(dbcp);
         runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
-        runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, 
testCase.getAutoCommitAsString());
+        if (testCase.getAutoCommitAsString() == null) {
+            runner.removeProperty(PutDatabaseRecord.AUTO_COMMIT);
+        } else {
+            runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, 
testCase.getAutoCommitAsString());
+        }
         runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, 
testCase.getRollbackOnFailureAsString());
         runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, 
testCase.getBatchSizeAsString());
     }
@@ -316,7 +322,7 @@ public class PutDatabaseRecordTest {
     }
 
     public void testInsertNonRequiredColumnsUnmatchedField() throws 
InitializationException, ProcessException {
-        setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
+        setRunner(TestCaseEnum.DEFAULT_5.getTestCase());
 
         // Need to override the @Before method with a new processor that 
behaves badly
         processor = new PutDatabaseRecordUnmatchedField();
@@ -766,11 +772,29 @@ public class PutDatabaseRecordTest {
         runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
         runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 
String.valueOf(allowMultipleStatements));
 
+        Supplier<Statement> spyStmt = createStatementSpy();
+
         final Map<String, String> attrs = new HashMap<>();
         attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
         runner.enqueue(new byte[0], attrs);
         runner.run();
 
+        final int maxBatchSize = 
runner.getProcessContext().getProperty(PutDatabaseRecord.MAX_BATCH_SIZE).asInteger();
+        assertNotNull(spyStmt.get());
+        if (sqlStatements.length <= 1) {
+            // When there is only 1 sql statement, then never use batching
+            verify(spyStmt.get(), times(0)).executeBatch();
+        } else if (maxBatchSize == 0) {
+            // When maxBatchSize is 0, verify that all statements were 
executed in a single executeBatch call
+            verify(spyStmt.get(), times(1)).executeBatch();
+        } else if (maxBatchSize == 1) {
+            // When maxBatchSize is 1, verify that executeBatch was never 
called
+            verify(spyStmt.get(), times(0)).executeBatch();
+        } else {
+            // When maxBatchSize > 1, verify that executeBatch was called at 
least once
+            verify(spyStmt.get(), atLeastOnce()).executeBatch();
+        }
+
         runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
         final Connection conn = dbcp.getConnection();
         final Statement stmt = conn.createStatement();
@@ -2380,6 +2404,19 @@ public class PutDatabaseRecordTest {
         return () -> spyStmt[0];
     }
 
+    private Supplier<Statement> createStatementSpy() {
+        final Statement[] spyStmt = new Statement[1];
+        final Answer<DelegatingConnection> answer = (inv) -> new 
DelegatingConnection((Connection) inv.callRealMethod()) {
+            @Override
+            public Statement createStatement() throws SQLException {
+                spyStmt[0] = spy(getDelegate().createStatement());
+                return spyStmt[0];
+            }
+        };
+        doAnswer(answer).when(dbcp).getConnection();
+        return () -> spyStmt[0];
+    }
+
     static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
         @Override
         SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String 
tableName, TableSchema tableSchema, DMLSettings settings) throws 
IllegalArgumentException {

Reply via email to