This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 8f50e90e6f5633376081a9fcfe260c4a6031ff85
Author: Vibhath Ileperuma <vibhatharunapr...@gmail.com>
AuthorDate: Sat Nov 27 20:43:51 2021 +0530

    NIFI-8605 Adding a new property for ExecuteSQL and ExecuteSQLRecord 
processors to enable/disable auto committing
    change the default value of auto commit function to true
    Changed the auto commit property name and add more details in the 
description
    If the auto commit is set to false, commit() is called for consistency
    adds unit tests
    Fix the check style issue of having more than 200 characters in single line
    
    Signed-off-by: Matthew Burgess <mattyb...@apache.org>
    
    This closes #5554
---
 .../processors/standard/AbstractExecuteSQL.java    | 339 +++++++++++----------
 .../nifi/processors/standard/ExecuteSQL.java       |   1 +
 .../nifi/processors/standard/ExecuteSQLRecord.java |   1 +
 .../nifi/processors/standard/TestExecuteSQL.java   |  17 ++
 .../processors/standard/TestExecuteSQLRecord.java  |  17 ++
 5 files changed, 217 insertions(+), 158 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index bd6962c..55a4326 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -169,6 +169,22 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    public static final PropertyDescriptor AUTO_COMMIT = new 
PropertyDescriptor.Builder()
+            .name("esql-auto-commit")
+            .displayName("Set Auto Commit")
+            .description("Enables or disables the auto commit functionality of 
the DB connection. Default value is 'true'. " +
+                    "The default value can be used with most of the JDBC 
drivers and this functionality doesn't have any impact in most of the cases " +
+                    "since this processor is used to read data. " +
+                    "However, for some JDBC drivers such as PostgreSQL driver, 
it is required to disable the auto committing functionality " +
+                    "to limit the number of result rows fetching at a time. " +
+                    "When auto commit is enabled, postgreSQL driver loads 
whole result set to memory at once. " +
+                    "This could lead for a large amount of memory usage when 
executing queries which fetch large data sets. " +
+                    "More Details of this behaviour in PostgreSQL driver can 
be found in https://jdbc.postgresql.org//documentation/head/query.html. ")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+
     protected List<PropertyDescriptor> propDescriptors;
 
     protected DBCPService dbcpService;
@@ -236,195 +252,202 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         }
 
         int resultCount = 0;
-        try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes());
-             final PreparedStatement st = con.prepareStatement(selectQuery)) {
-            if (fetchSize != null && fetchSize > 0) {
-                try {
-                    st.setFetchSize(fetchSize);
-                } catch (SQLException se) {
-                    // Not all drivers support this, just log the error (at 
debug level) and move on
-                    logger.debug("Cannot set fetch size to {} due to {}", new 
Object[]{fetchSize, se.getLocalizedMessage()}, se);
+        try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
+            con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean());
+            try (final PreparedStatement st = 
con.prepareStatement(selectQuery)) {
+                if (fetchSize != null && fetchSize > 0) {
+                    try {
+                        st.setFetchSize(fetchSize);
+                    } catch (SQLException se) {
+                        // Not all drivers support this, just log the error 
(at debug level) and move on
+                        logger.debug("Cannot set fetch size to {} due to {}", 
new Object[]{fetchSize, se.getLocalizedMessage()}, se);
+                    }
+                }
+                st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+                // Execute pre-query, throw exception and cleanup Flow Files 
if fail
+                Pair<String, SQLException> failure = 
executeConfigStatements(con, preQueries);
+                if (failure != null) {
+                    // In case of failure, assigning config query to 
"selectQuery" to follow current error handling
+                    selectQuery = failure.getLeft();
+                    throw failure.getRight();
                 }
-            }
-            st.setQueryTimeout(queryTimeout); // timeout in seconds
-
-            // Execute pre-query, throw exception and cleanup Flow Files if 
fail
-            Pair<String,SQLException> failure = executeConfigStatements(con, 
preQueries);
-            if (failure != null) {
-                // In case of failure, assigning config query to "selectQuery" 
to follow current error handling
-                selectQuery = failure.getLeft();
-                throw failure.getRight();
-            }
 
-            if (fileToProcess != null) {
-                JdbcCommon.setParameters(st, fileToProcess.getAttributes());
-            }
-            logger.debug("Executing query {}", new Object[]{selectQuery});
+                if (fileToProcess != null) {
+                    JdbcCommon.setParameters(st, 
fileToProcess.getAttributes());
+                }
+                logger.debug("Executing query {}", new Object[]{selectQuery});
 
-            int fragmentIndex = 0;
-            final String fragmentId = UUID.randomUUID().toString();
+                int fragmentIndex = 0;
+                final String fragmentId = UUID.randomUUID().toString();
 
-            final StopWatch executionTime = new StopWatch(true);
+                final StopWatch executionTime = new StopWatch(true);
 
-            boolean hasResults = st.execute();
+                boolean hasResults = st.execute();
 
-            long executionTimeElapsed = 
executionTime.getElapsed(TimeUnit.MILLISECONDS);
+                long executionTimeElapsed = 
executionTime.getElapsed(TimeUnit.MILLISECONDS);
 
-            boolean hasUpdateCount = st.getUpdateCount() != -1;
+                boolean hasUpdateCount = st.getUpdateCount() != -1;
 
-            Map<String, String> inputFileAttrMap = fileToProcess == null ? 
null : fileToProcess.getAttributes();
-            String inputFileUUID = fileToProcess == null ? null : 
fileToProcess.getAttribute(CoreAttributes.UUID.key());
-            while (hasResults || hasUpdateCount) {
-                //getMoreResults() and execute() return false to indicate that 
the result of the statement is just a number and not a ResultSet
-                if (hasResults) {
-                    final AtomicLong nrOfRows = new AtomicLong(0L);
+                Map<String, String> inputFileAttrMap = fileToProcess == null ? 
null : fileToProcess.getAttributes();
+                String inputFileUUID = fileToProcess == null ? null : 
fileToProcess.getAttribute(CoreAttributes.UUID.key());
+                while (hasResults || hasUpdateCount) {
+                    //getMoreResults() and execute() return false to indicate 
that the result of the statement is just a number and not a ResultSet
+                    if (hasResults) {
+                        final AtomicLong nrOfRows = new AtomicLong(0L);
 
-                    try {
-                        final ResultSet resultSet = st.getResultSet();
-                        do {
-                            final StopWatch fetchTime = new StopWatch(true);
-
-                            FlowFile resultSetFF;
-                            if (fileToProcess == null) {
-                                resultSetFF = session.create();
-                            } else {
-                                resultSetFF = session.create(fileToProcess);
-                            }
+                        try {
+                            final ResultSet resultSet = st.getResultSet();
+                            do {
+                                final StopWatch fetchTime = new 
StopWatch(true);
 
-                            if (inputFileAttrMap != null) {
-                                resultSetFF = 
session.putAllAttributes(resultSetFF, inputFileAttrMap);
-                            }
+                                FlowFile resultSetFF;
+                                if (fileToProcess == null) {
+                                    resultSetFF = session.create();
+                                } else {
+                                    resultSetFF = 
session.create(fileToProcess);
+                                }
+
+                                if (inputFileAttrMap != null) {
+                                    resultSetFF = 
session.putAllAttributes(resultSetFF, inputFileAttrMap);
+                                }
 
 
-                            try {
-                                resultSetFF = session.write(resultSetFF, out 
-> {
-                                    try {
-                                        
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
-                                    } catch (Exception e) {
-                                        throw (e instanceof ProcessException) 
? (ProcessException) e : new ProcessException(e);
+                                try {
+                                    resultSetFF = session.write(resultSetFF, 
out -> {
+                                        try {
+                                            
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
+                                        } catch (Exception e) {
+                                            throw (e instanceof 
ProcessException) ? (ProcessException) e : new ProcessException(e);
+                                        }
+                                    });
+
+                                    long fetchTimeElapsed = 
fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+                                    // set attributes
+                                    final Map<String, String> attributesToAdd 
= new HashMap<>();
+                                    attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
+                                    attributesToAdd.put(RESULT_QUERY_DURATION, 
String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+                                    
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, 
String.valueOf(executionTimeElapsed));
+                                    
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
+                                    attributesToAdd.put(RESULTSET_INDEX, 
String.valueOf(resultCount));
+                                    if (inputFileUUID != null) {
+                                        
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
                                     }
-                                });
-
-                                long fetchTimeElapsed = 
fetchTime.getElapsed(TimeUnit.MILLISECONDS);
-
-                                // set attributes
-                                final Map<String, String> attributesToAdd = 
new HashMap<>();
-                                attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
-                                attributesToAdd.put(RESULT_QUERY_DURATION, 
String.valueOf(executionTimeElapsed + fetchTimeElapsed));
-                                
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, 
String.valueOf(executionTimeElapsed));
-                                attributesToAdd.put(RESULT_QUERY_FETCH_TIME, 
String.valueOf(fetchTimeElapsed));
-                                attributesToAdd.put(RESULTSET_INDEX, 
String.valueOf(resultCount));
-                                if (inputFileUUID != null) {
-                                    attributesToAdd.put(INPUT_FLOWFILE_UUID, 
inputFileUUID);
-                                }
-                                
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
-                                resultSetFF = 
session.putAllAttributes(resultSetFF, attributesToAdd);
-                                sqlWriter.updateCounters(session);
-
-                                // if fragmented ResultSet, determine if we 
should keep this fragment; set fragment attributes
-                                if (maxRowsPerFlowFile > 0) {
-                                    // if row count is zero and this is not 
the first fragment, drop it instead of committing it.
-                                    if (nrOfRows.get() == 0 && fragmentIndex > 
0) {
-                                        session.remove(resultSetFF);
-                                        break;
+                                    
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
+                                    resultSetFF = 
session.putAllAttributes(resultSetFF, attributesToAdd);
+                                    sqlWriter.updateCounters(session);
+
+                                    // if fragmented ResultSet, determine if 
we should keep this fragment; set fragment attributes
+                                    if (maxRowsPerFlowFile > 0) {
+                                        // if row count is zero and this is 
not the first fragment, drop it instead of committing it.
+                                        if (nrOfRows.get() == 0 && 
fragmentIndex > 0) {
+                                            session.remove(resultSetFF);
+                                            break;
+                                        }
+
+                                        resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+                                        resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
                                     }
 
-                                    resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
-                                    resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
-                                }
-
-                                logger.info("{} contains {} records; 
transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()});
+                                    logger.info("{} contains {} records; 
transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()});
 
-                                // Report a FETCH event if there was an 
incoming flow file, or a RECEIVE event otherwise
-                                if(context.hasIncomingConnection()) {
-                                    
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
-                                } else {
-                                    
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
-                                }
-                                resultSetFlowFiles.add(resultSetFF);
-
-                                // If we've reached the batch size, send out 
the flow files
-                                if (outputBatchSize > 0 && 
resultSetFlowFiles.size() >= outputBatchSize) {
-                                    session.transfer(resultSetFlowFiles, 
REL_SUCCESS);
-                                    // Need to remove the original input file 
if it exists
-                                    if (fileToProcess != null) {
-                                        session.remove(fileToProcess);
-                                        fileToProcess = null;
+                                    // Report a FETCH event if there was an 
incoming flow file, or a RECEIVE event otherwise
+                                    if (context.hasIncomingConnection()) {
+                                        
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+                                    } else {
+                                        
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+                                    }
+                                    resultSetFlowFiles.add(resultSetFF);
+
+                                    // If we've reached the batch size, send 
out the flow files
+                                    if (outputBatchSize > 0 && 
resultSetFlowFiles.size() >= outputBatchSize) {
+                                        session.transfer(resultSetFlowFiles, 
REL_SUCCESS);
+                                        // Need to remove the original input 
file if it exists
+                                        if (fileToProcess != null) {
+                                            session.remove(fileToProcess);
+                                            fileToProcess = null;
+                                        }
+
+                                        session.commitAsync();
+                                        resultSetFlowFiles.clear();
                                     }
 
-                                    session.commitAsync();
-                                    resultSetFlowFiles.clear();
+                                    fragmentIndex++;
+                                } catch (Exception e) {
+                                    // Remove any result set flow file(s) and 
propagate the exception
+                                    session.remove(resultSetFF);
+                                    session.remove(resultSetFlowFiles);
+                                    if (e instanceof ProcessException) {
+                                        throw (ProcessException) e;
+                                    } else {
+                                        throw new ProcessException(e);
+                                    }
                                 }
+                            } while (maxRowsPerFlowFile > 0 && nrOfRows.get() 
== maxRowsPerFlowFile);
 
-                                fragmentIndex++;
-                            } catch (Exception e) {
-                                // Remove any result set flow file(s) and 
propagate the exception
-                                session.remove(resultSetFF);
-                                session.remove(resultSetFlowFiles);
-                                if (e instanceof ProcessException) {
-                                    throw (ProcessException) e;
-                                } else {
-                                    throw new ProcessException(e);
+                            // If we are splitting results but not outputting 
batches, set count on all FlowFiles
+                            if (outputBatchSize == 0 && maxRowsPerFlowFile > 
0) {
+                                for (int i = 0; i < resultSetFlowFiles.size(); 
i++) {
+                                    resultSetFlowFiles.set(i,
+                                            
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, 
Integer.toString(fragmentIndex)));
                                 }
                             }
-                        } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == 
maxRowsPerFlowFile);
-
-                        // If we are splitting results but not outputting 
batches, set count on all FlowFiles
-                        if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
-                            for (int i = 0; i < resultSetFlowFiles.size(); 
i++) {
-                                resultSetFlowFiles.set(i,
-                                        
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, 
Integer.toString(fragmentIndex)));
-                            }
+                        } catch (final SQLException e) {
+                            throw new ProcessException(e);
                         }
-                    } catch (final SQLException e) {
-                        throw new ProcessException(e);
+
+                        resultCount++;
                     }
 
-                    resultCount++;
+                    // are there anymore result sets?
+                    try {
+                        hasResults = 
st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
+                        hasUpdateCount = st.getUpdateCount() != -1;
+                    } catch (SQLException ex) {
+                        hasResults = false;
+                        hasUpdateCount = false;
+                    }
                 }
 
-                // are there anymore result sets?
-                try {
-                    hasResults = 
st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
-                    hasUpdateCount = st.getUpdateCount() != -1;
-                } catch (SQLException ex) {
-                    hasResults = false;
-                    hasUpdateCount = false;
+                // Execute post-query, throw exception and cleanup Flow Files 
if fail
+                failure = executeConfigStatements(con, postQueries);
+                if (failure != null) {
+                    selectQuery = failure.getLeft();
+                    resultSetFlowFiles.forEach(ff -> session.remove(ff));
+                    throw failure.getRight();
                 }
-            }
 
-            // Execute post-query, throw exception and cleanup Flow Files if 
fail
-            failure = executeConfigStatements(con, postQueries);
-            if (failure != null) {
-                selectQuery = failure.getLeft();
-                resultSetFlowFiles.forEach(ff -> session.remove(ff));
-                throw failure.getRight();
-            }
-
-            // Transfer any remaining files to SUCCESS
-            session.transfer(resultSetFlowFiles, REL_SUCCESS);
-            resultSetFlowFiles.clear();
+                // If the auto commit is set to false, commit() is called for 
consistency
+                if (!con.getAutoCommit()) {
+                    con.commit();
+                }
 
-            //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
-            //  pass the original flow file down the line to trigger 
downstream processors
-            if (fileToProcess != null) {
-                if (resultCount > 0) {
-                    session.remove(fileToProcess);
-                } else {
-                    fileToProcess = session.write(fileToProcess, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
-                    fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, "0");
-                    fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
-                    session.transfer(fileToProcess, REL_SUCCESS);
+                // Transfer any remaining files to SUCCESS
+                session.transfer(resultSetFlowFiles, REL_SUCCESS);
+                resultSetFlowFiles.clear();
+
+                //If we had at least one result then it's OK to drop the 
original file, but if we had no results then
+                //  pass the original flow file down the line to trigger 
downstream processors
+                if (fileToProcess != null) {
+                    if (resultCount > 0) {
+                        session.remove(fileToProcess);
+                    } else {
+                        fileToProcess = session.write(fileToProcess, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
+                        fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, "0");
+                        fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
+                        session.transfer(fileToProcess, REL_SUCCESS);
+                    }
+                } else if (resultCount == 0) {
+                    //If we had no inbound FlowFile, no exceptions, and the 
SQL generated no result sets (Insert/Update/Delete statements only)
+                    // Then generate an empty Output FlowFile
+                    FlowFile resultSetFF = session.create();
+
+                    resultSetFF = session.write(resultSetFF, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
+                    resultSetFF = session.putAttribute(resultSetFF, 
RESULT_ROW_COUNT, "0");
+                    resultSetFF = session.putAttribute(resultSetFF, 
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
+                    session.transfer(resultSetFF, REL_SUCCESS);
                 }
-            } else if (resultCount == 0) {
-                //If we had no inbound FlowFile, no exceptions, and the SQL 
generated no result sets (Insert/Update/Delete statements only)
-                // Then generate an empty Output FlowFile
-                FlowFile resultSetFF = session.create();
-
-                resultSetFF = session.write(resultSetFF, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
-                resultSetFF = session.putAttribute(resultSetFF, 
RESULT_ROW_COUNT, "0");
-                resultSetFF = session.putAttribute(resultSetFF, 
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
-                session.transfer(resultSetFF, REL_SUCCESS);
             }
         } catch (final ProcessException | SQLException e) {
             //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index b903e46..cc819db 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -134,6 +134,7 @@ public class ExecuteSQL extends AbstractExecuteSQL {
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
         pds.add(FETCH_SIZE);
+        pds.add(AUTO_COMMIT);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 76eeaae..2ebddac 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -140,6 +140,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
         pds.add(FETCH_SIZE);
+        pds.add(AUTO_COMMIT);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 16cd1d5..7227ce2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -176,6 +176,18 @@ public class TestExecuteSQL {
     }
 
     @Test
+    public void testAutoCommitFalse() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+        runner.setProperty(ExecuteSQL.AUTO_COMMIT, "false");
+        invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false);
+    }
+
+    @Test
+    public void testAutoCommitTrue() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+        runner.setProperty(ExecuteSQL.AUTO_COMMIT, "true");
+        invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false);
+    }
+
+    @Test
     public void testWithNullIntColumn() throws SQLException {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);
@@ -556,6 +568,11 @@ public class TestExecuteSQL {
         SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
         LOGGER.info("test data loaded");
 
+        //commit loaded data if auto-commit is dissabled
+        if (!con.getAutoCommit()){
+            con.commit();
+        }
+
         // ResultSet size will be 1x200x100 = 20 000 rows
         // because of where PER.ID = ${person.id}
         final int nrOfRows = 20000;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index fe17e94..edf013f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -170,6 +170,18 @@ public class TestExecuteSQLRecord {
     }
 
     @Test
+    public void testAutoCommitFalse() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+        runner.setProperty(ExecuteSQL.AUTO_COMMIT, "false");
+        invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
+    }
+
+    @Test
+    public void testAutoCommitTrue() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+        runner.setProperty(ExecuteSQL.AUTO_COMMIT, "true");
+        invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
+    }
+
+    @Test
     public void testWithOutputBatching() throws InitializationException, 
SQLException {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);
@@ -545,6 +557,11 @@ public class TestExecuteSQLRecord {
         SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100);
         LOGGER.info("test data loaded");
 
+        //commit loaded data if auto-commit is dissabled
+        if (!con.getAutoCommit()){
+            con.commit();
+        }
+
         // ResultSet size will be 1x200x100 = 20 000 rows
         // because of where PER.ID = ${person.id}
         final int nrOfRows = 20000;

Reply via email to