Repository: nifi
Updated Branches:
  refs/heads/master d5bce9197 -> 63f55d05b


NIFI-5724 make database connection autocommit configurable

making the database session autocommit value a configurable property

adding custom validation to PutSQL processor so as to disallow 'supports 
transaction' and 'rollback on failure' to be true when the autocommit value has 
been set to true

fixing some style issues to conform to standards

This closes #3113.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/63f55d05
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/63f55d05
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/63f55d05

Branch: refs/heads/master
Commit: 63f55d05b4f3f83b9e9f2206f4129ae7a4ade569
Parents: d5bce91
Author: Vish Uma <visw...@gmail.com>
Authored: Fri Oct 26 15:32:46 2018 -0400
Committer: Koji Kawamura <ijokaruma...@apache.org>
Committed: Fri Nov 9 17:07:27 2018 +0900

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/PutSQL.java | 53 ++++++++++++++++++--
 1 file changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/63f55d05/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 9957c2e..38134c2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -28,6 +28,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -64,6 +66,7 @@ import java.sql.SQLNonTransientException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -74,6 +77,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 
+import static java.lang.String.format;
 import static 
org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
 
 @SupportsBatching
@@ -134,6 +138,14 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    static final PropertyDescriptor AUTO_COMMIT = new 
PropertyDescriptor.Builder()
+            .name("database-session-autocommit")
+            .displayName("Database Session AutoCommit")
+            .description("The autocommit mode to set on the database 
connection being used.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
     static final PropertyDescriptor SUPPORT_TRANSACTIONS = new 
PropertyDescriptor.Builder()
             .name("Support Fragmented Transactions")
             .description("If true, when a FlowFile is consumed by this 
Processor, the Processor will first check the fragment.identifier and 
fragment.count attributes of that FlowFile. "
@@ -189,6 +201,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor 
{
         properties.add(CONNECTION_POOL);
         properties.add(SQL_STATEMENT);
         properties.add(SUPPORT_TRANSACTIONS);
+        properties.add(AUTO_COMMIT);
         properties.add(TRANSACTION_TIMEOUT);
         properties.add(BATCH_SIZE);
         properties.add(OBTAIN_GENERATED_KEYS);
@@ -197,6 +210,34 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
     }
 
     @Override
+    protected final Collection<ValidationResult> 
customValidate(ValidationContext context) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+        final String support_transactions = 
context.getProperty(SUPPORT_TRANSACTIONS).getValue();
+        final String rollback_on_failure = 
context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).getValue();
+        final String auto_commit = context.getProperty(AUTO_COMMIT).getValue();
+
+        if(auto_commit.equalsIgnoreCase("true")) {
+            if(support_transactions.equalsIgnoreCase("true")) {
+                results.add(new ValidationResult.Builder()
+                                .subject(SUPPORT_TRANSACTIONS.getDisplayName())
+                                .explanation(format("'%s' cannot be set to 
'true' when '%s' is also set to 'true'."
+                                        + "Transactions for batch updates 
cannot be supported when auto commit is set to 'true'",
+                                        SUPPORT_TRANSACTIONS.getDisplayName(), 
AUTO_COMMIT.getDisplayName()))
+                                .build());
+            }
+            if(rollback_on_failure.equalsIgnoreCase("true")) {
+                results.add(new ValidationResult.Builder()
+                        
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
+                        .explanation(format("'%s' cannot be set to 'true' when 
'%s' is also set to 'true'."
+                                + "Transaction rollbacks for batch updates 
cannot be supported when auto commit is set to 'true'",
+                                
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), 
AUTO_COMMIT.getDisplayName()))
+                        .build());
+            }
+        }
+        return results;
+    }
+
+    @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> rels = new HashSet<>();
         rels.add(REL_SUCCESS);
@@ -239,7 +280,10 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
                 .getConnection(ff == null ? Collections.emptyMap() : 
ff.getAttributes());
         try {
             fc.originalAutoCommit = connection.getAutoCommit();
-            connection.setAutoCommit(false);
+            final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
+            if(fc.originalAutoCommit != autocommit) {
+                connection.setAutoCommit(autocommit);
+            }
         } catch (SQLException e) {
             throw new ProcessException("Failed to disable auto commit due to " 
+ e, e);
         }
@@ -521,9 +565,10 @@ public class PutSQL extends 
AbstractSessionFactoryProcessor {
 
         process.cleanup((c, s, fc, conn) -> {
             // make sure that we try to set the auto commit back to whatever 
it was.
-            if (fc.originalAutoCommit) {
+            final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
+            if (fc.originalAutoCommit != autocommit) {
                 try {
-                    conn.setAutoCommit(true);
+                    conn.setAutoCommit(fc.originalAutoCommit);
                 } catch (final SQLException se) {
                     getLogger().warn("Failed to reset autocommit due to {}", 
new Object[]{se});
                 }
@@ -670,7 +715,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor 
{
         int selectedNumFragments = 0;
         final BitSet bitSet = new BitSet();
 
-        BiFunction<String, Object[], IllegalArgumentException> illegal = (s, 
objects) -> new IllegalArgumentException(String.format(s, objects));
+        BiFunction<String, Object[], IllegalArgumentException> illegal = (s, 
objects) -> new IllegalArgumentException(format(s, objects));
 
         for (final FlowFile flowFile : flowFiles) {
             final String fragmentCount = 
flowFile.getAttribute(FRAGMENT_COUNT_ATTR);

Reply via email to