NIFI-1613 Initial version, try to improve conversion for different SQL types. 
New test and refactored existing test to reuse DBCP service.

nifi-1613 Adding numeric and Date/time types conversion and test.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3b2e43b7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3b2e43b7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3b2e43b7

Branch: refs/heads/master
Commit: 3b2e43b75c80be854cc854c3941e882c794c1d76
Parents: b603cb9
Author: Toivo Adams <toivo.ad...@gmail.com>
Authored: Sun Mar 20 21:13:15 2016 +0200
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Wed Jul 12 17:02:55 2017 -0400

----------------------------------------------------------------------
 .../processors/standard/ConvertJSONToSQL.java   |  68 ++++-
 .../standard/TestConvertJSONToSQL.java          | 296 ++++++++-----------
 2 files changed, 187 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3b2e43b7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index 2eb9cad..ca92ad4 100755
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -26,6 +26,7 @@ import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,11 +39,11 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -478,10 +479,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                 final Integer colSize = desc.getColumnSize();
                 final JsonNode fieldNode = rootNode.get(fieldName);
                 if (!fieldNode.isNull()) {
-                    String fieldValue = fieldNode.asText();
-                    if (colSize != null && fieldValue.length() > colSize) {
-                        fieldValue = fieldValue.substring(0, colSize);
-                    }
+                    String fieldValue = createSqlStringValue(fieldNode, 
colSize, sqlType);
                     attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
                 }
             }
@@ -505,6 +503,61 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         return sqlBuilder.toString();
     }
 
+    /**
+     *  Try to create correct SQL String representation of value.
+     *
+     */
+    protected static String createSqlStringValue(final JsonNode fieldNode, 
final Integer colSize, final int sqlType) {
+        String fieldValue = fieldNode.asText();
+
+        switch (sqlType) {
+
+        // only "true" is considered true, everything else is false
+        case Types.BOOLEAN:
+            switch (fieldValue==null?"":fieldValue) {
+            case "true":
+                fieldValue = "true";
+                break;
+
+            default:
+                fieldValue = "false";
+                break;
+            }
+            break;
+
+        // Don't truncate numeric types.
+        // Should we check value is indeed number and throw error if not?
+        case Types.TINYINT:
+        case Types.SMALLINT:
+        case Types.INTEGER:
+        case Types.BIGINT:
+        case Types.REAL:
+        case Types.FLOAT:
+        case Types.DOUBLE:
+        case Types.DECIMAL:
+        case Types.NUMERIC:
+            break;
+
+        // Don't truncate date and time types.
+        // Should we check value is indeed correct date and/or time and throw 
error if not?
+        // We assume date and time is already correct, but because 
ConvertJSONToSQL is often used together with PutSQL
+        // maybe we should assure PutSQL correctly understands date and time 
values.
+        // Currently PutSQL expect Long numeric values. But JSON usually uses 
ISO 8601, for example: 2012-04-23T18:25:43.511Z for dates.
+        case Types.DATE:
+        case Types.TIME:
+        case Types.TIMESTAMP:
+            break;
+
+        default:
+            if (colSize != null && fieldValue.length() > colSize) {
+                fieldValue = fieldValue.substring(0, colSize);
+            }
+            break;
+        }
+
+        return fieldValue;
+    }
+
     private String generateUpdate(final JsonNode rootNode, final Map<String, 
String> attributes, final String tableName, final String updateKeys,
                                   final TableSchema schema, final boolean 
translateFieldNames, final boolean ignoreUnmappedFields, final boolean 
failUnmappedColumns,
                                   final boolean warningUnmappedColumns, 
boolean escapeColumnNames, boolean quoteTableName) {
@@ -599,10 +652,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
             final JsonNode fieldNode = rootNode.get(fieldName);
             if (!fieldNode.isNull()) {
-                String fieldValue = rootNode.get(fieldName).asText();
-                if (colSize != null && fieldValue.length() > colSize) {
-                    fieldValue = fieldValue.substring(0, colSize);
-                }
+                String fieldValue = createSqlStringValue(fieldNode, colSize, 
sqlType);
                 attributes.put("sql.args." + fieldCount + ".value", 
fieldValue);
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3b2e43b7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
index bc9d7f9..3f9c52c 100755
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java
@@ -16,14 +16,21 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
@@ -32,8 +39,12 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -42,30 +53,36 @@ import static 
org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID
 
 public class TestConvertJSONToSQL {
     static String createPersons = "CREATE TABLE PERSONS (id integer primary 
key, name varchar(100), code integer)";
+    static String createDifferentTypes = "CREATE TABLE DIFTYPES (id integer 
primary key, b boolean, f float, dbl double, dcml decimal, d date)";
+
+    @ClassRule
+    public static TemporaryFolder folder = new TemporaryFolder();
 
-    @Rule
-    public TemporaryFolder folder = new TemporaryFolder();
+    /**
+     * Setting up Connection pooling is expensive operation.
+     * So let's do this only once and reuse MockDBCPService in each test.
+     */
+    static protected DBCPService service;
 
     @BeforeClass
-    public static void setup() {
+    public static void setupClass() throws ProcessException, SQLException {
         System.setProperty("derby.stream.error.file", "target/derby.log");
-    }
-
-    @Test
-    public void testInsert() throws InitializationException, ProcessException, 
SQLException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
         final File tempDir = folder.getRoot();
         final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
-
+        service = new MockDBCPService(dbDir.getAbsolutePath());
         try (final Connection conn = service.getConnection()) {
             try (final Statement stmt = conn.createStatement()) {
                 stmt.executeUpdate(createPersons);
             }
         }
+    }
 
+    @Test
+    public void testInsert() throws InitializationException, ProcessException, 
SQLException, IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
+
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@@ -163,18 +180,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testInsertWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@@ -199,18 +207,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateWithNullValue() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -272,18 +271,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testMultipleInserts() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@@ -343,18 +333,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateBasedOnPrimaryKey() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -414,18 +395,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUnmappedFieldBehavior() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@@ -450,18 +422,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateBasedOnUpdateKey() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -523,18 +486,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -561,18 +515,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -586,18 +531,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateWithMalformedJson() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -611,18 +547,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testInsertWithMissingField() throws InitializationException, 
ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@@ -636,20 +563,17 @@ public class TestConvertJSONToSQL {
     @Test
     public void testInsertWithMissingColumnFail() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
 
         try (final Connection conn = service.getConnection()) {
             try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name 
varchar(100), code integer, generated_key integer primary key)");
+                stmt.executeUpdate("CREATE TABLE PERSONS3 (id integer, name 
varchar(100), code integer, generated_key integer primary key)");
             }
         }
 
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS3");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
         runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, 
ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
         
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
@@ -661,20 +585,17 @@ public class TestConvertJSONToSQL {
     @Test
     public void testInsertWithMissingColumnWarning() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
-        runner.addControllerService("dbcp", service);
-        runner.enableControllerService(service);
 
         try (final Connection conn = service.getConnection()) {
             try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name 
varchar(100), code integer, generated_key integer primary key)");
+                stmt.executeUpdate("CREATE TABLE PERSONS2 (id integer, name 
varchar(100), code integer, generated_key integer primary key)");
             }
         }
 
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
-        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
+        runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS2");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
         runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, 
ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN);
         
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
@@ -691,24 +612,15 @@ public class TestConvertJSONToSQL {
         out.assertAttributeEquals("sql.args.3.type", 
String.valueOf(java.sql.Types.INTEGER));
         out.assertAttributeEquals("sql.args.3.value", "48");
 
-        out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES 
(?, ?, ?)");
+        out.assertContentEquals("INSERT INTO PERSONS2 (ID, NAME, CODE) VALUES 
(?, ?, ?)");
     } // End testInsertWithMissingColumnWarning()
 
     @Test
     public void testInsertWithMissingColumnIgnore() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name 
varchar(100), code integer, generated_key integer primary key)");
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
@@ -733,18 +645,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateWithMissingColumnFail() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -759,18 +662,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateWithMissingColumnWarning() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -797,18 +691,9 @@ public class TestConvertJSONToSQL {
     @Test
     public void testUpdateWithMissingColumnIgnore() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(ConvertJSONToSQL.class);
-        final File tempDir = folder.getRoot();
-        final File dbDir = new File(tempDir, "db");
-        final DBCPService service = new 
MockDBCPService(dbDir.getAbsolutePath());
+
         runner.addControllerService("dbcp", service);
         runner.enableControllerService(service);
-
-        try (final Connection conn = service.getConnection()) {
-            try (final Statement stmt = conn.createStatement()) {
-                stmt.executeUpdate(createPersons);
-            }
-        }
-
         runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
         runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
         runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE");
@@ -832,6 +717,81 @@ public class TestConvertJSONToSQL {
 
     } // End testUpdateWithMissingColumnIgnore()
 
+    /**
+     *  Test create correct SQL String representation of value.
+     *  Use PutSQL processor to verify converted value can be used and don't 
fail.
+     */
+    @Test
+    public void testCreateSqlStringValue() throws ProcessException, 
SQLException, JsonGenerationException, JsonMappingException, IOException, 
InitializationException {
+        final TestRunner putSqlRunner = 
TestRunners.newTestRunner(PutSQL.class);
+
+        final AtomicInteger id = new AtomicInteger(20);
+
+        putSqlRunner.addControllerService("dbcp", service);
+        putSqlRunner.enableControllerService(service);
+        putSqlRunner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
+        putSqlRunner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        String tableName = "DIFTYPES";
+        ObjectMapper mapper = new ObjectMapper();
+        ResultSet colrs = null;
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate(createDifferentTypes);
+            }
+            colrs = conn.getMetaData().getColumns(null, null, tableName, "%");
+            while (colrs.next()) {
+                final int sqlType = colrs.getInt("DATA_TYPE");
+                final int colSize = colrs.getInt("COLUMN_SIZE");
+                switch (sqlType) {
+                case Types.BOOLEAN:
+                    String json = mapper.writeValueAsString("true");
+                    JsonNode fieldNode = mapper.readTree(json);
+                    String booleanString = 
ConvertJSONToSQL.createSqlStringValue(fieldNode, colSize, sqlType);
+                    assertEquals("true",booleanString);
+
+                    Map<String, String> attributes = new HashMap<>();
+                    attributes.put("sql.args.1.type", String.valueOf(sqlType));
+                    attributes.put("sql.args.1.value", booleanString);
+
+                    byte[] data = ("INSERT INTO DIFTYPES (ID, B) VALUES (" + 
id.incrementAndGet() + ", ?)").getBytes();
+                    putSqlRunner.enqueue(data, attributes);
+                    putSqlRunner.run();
+                    List<MockFlowFile> failed = 
putSqlRunner.getFlowFilesForRelationship(PutSQL.REL_FAILURE);
+                    putSqlRunner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
+                    putSqlRunner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+                    putSqlRunner.clearTransferState();
+                    break;
+
+                case Types.FLOAT:
+                case Types.DOUBLE:
+                    json = mapper.writeValueAsString("78895654.6575");
+                    fieldNode = mapper.readTree(json);
+                    String numberString = 
ConvertJSONToSQL.createSqlStringValue(fieldNode, colSize, sqlType);
+                    assertEquals("78895654.6575",numberString);
+
+                    attributes = new HashMap<>();
+                    attributes.put("sql.args.1.type", String.valueOf(sqlType));
+                    attributes.put("sql.args.1.value", numberString);
+
+                    data = ("INSERT INTO DIFTYPES (ID, dbl) VALUES (" + 
id.incrementAndGet() + ", ?)").getBytes();
+                    putSqlRunner.enqueue(data, attributes);
+                    putSqlRunner.run();
+                    failed = 
putSqlRunner.getFlowFilesForRelationship(PutSQL.REL_FAILURE);
+                    putSqlRunner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
+                    putSqlRunner.assertTransferCount(PutSQL.REL_FAILURE, 0);
+                    putSqlRunner.clearTransferState();
+                    break;
+
+                default:
+                    break;
+                }
+
+            }
+
+        }
+
+    }
 
     @Test
     public void testDelete() throws InitializationException, ProcessException, 
SQLException, IOException {

Reply via email to