NIFI-1613 Initial version, try to improve conversion for different SQL types. New test and refactored existing test to reuse DBCP service.
nifi-1613 Adding numeric and Date/time types conversion and test. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3b2e43b7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3b2e43b7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3b2e43b7 Branch: refs/heads/master Commit: 3b2e43b75c80be854cc854c3941e882c794c1d76 Parents: b603cb9 Author: Toivo Adams <toivo.ad...@gmail.com> Authored: Sun Mar 20 21:13:15 2016 +0200 Committer: Matt Burgess <mattyb...@apache.org> Committed: Wed Jul 12 17:02:55 2017 -0400 ---------------------------------------------------------------------- .../processors/standard/ConvertJSONToSQL.java | 68 ++++- .../standard/TestConvertJSONToSQL.java | 296 ++++++++----------- 2 files changed, 187 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/3b2e43b7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index 2eb9cad..ca92ad4 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -26,6 +26,7 @@ import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -38,11 +39,11 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -478,10 +479,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { final Integer colSize = desc.getColumnSize(); final JsonNode fieldNode = rootNode.get(fieldName); if (!fieldNode.isNull()) { - String fieldValue = fieldNode.asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } + String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType); attributes.put("sql.args." + fieldCount + ".value", fieldValue); } } @@ -505,6 +503,61 @@ public class ConvertJSONToSQL extends AbstractProcessor { return sqlBuilder.toString(); } + /** + * Try to create correct SQL String representation of value. + * + */ + protected static String createSqlStringValue(final JsonNode fieldNode, final Integer colSize, final int sqlType) { + String fieldValue = fieldNode.asText(); + + switch (sqlType) { + + // only "true" is considered true, everything else is false + case Types.BOOLEAN: + switch (fieldValue==null?"":fieldValue) { + case "true": + fieldValue = "true"; + break; + + default: + fieldValue = "false"; + break; + } + break; + + // Don't truncate numeric types. + // Should we check value is indeed number and throw error if not? + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + case Types.DECIMAL: + case Types.NUMERIC: + break; + + // Don't truncate date and time types. + // Should we check value is indeed correct date and/or time and throw error if not? + // We assume date and time is already correct, but because ConvertJSONToSQL is often used together with PutSQL + // maybe we should assure PutSQL correctly understands date and time values. + // Currently PutSQL expect Long numeric values. But JSON usually uses ISO 8601, for example: 2012-04-23T18:25:43.511Z for dates. + case Types.DATE: + case Types.TIME: + case Types.TIMESTAMP: + break; + + default: + if (colSize != null && fieldValue.length() > colSize) { + fieldValue = fieldValue.substring(0, colSize); + } + break; + } + + return fieldValue; + } + private String generateUpdate(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys, final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns, final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) { @@ -599,10 +652,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { final JsonNode fieldNode = rootNode.get(fieldName); if (!fieldNode.isNull()) { - String fieldValue = rootNode.get(fieldName).asText(); - if (colSize != null && fieldValue.length() > colSize) { - fieldValue = fieldValue.substring(0, colSize); - } + String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType); attributes.put("sql.args." + fieldCount + ".value", fieldValue); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/3b2e43b7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java index bc9d7f9..3f9c52c 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertJSONToSQL.java @@ -16,14 +16,21 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.IOException; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; @@ -32,8 +39,12 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -42,30 +53,36 @@ import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID public class TestConvertJSONToSQL { static String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; + static String createDifferentTypes = "CREATE TABLE DIFTYPES (id integer primary key, b boolean, f float, dbl double, dcml decimal, d date)"; + + @ClassRule + public static TemporaryFolder folder = new TemporaryFolder(); - @Rule - public TemporaryFolder folder = new TemporaryFolder(); + /** + * Setting up Connection pooling is expensive operation. + * So let's do this only once and reuse MockDBCPService in each test. + */ + static protected DBCPService service; @BeforeClass - public static void setup() { + public static void setupClass() throws ProcessException, SQLException { System.setProperty("derby.stream.error.file", "target/derby.log"); - } - - @Test - public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); final File tempDir = folder.getRoot(); final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); - + service = new MockDBCPService(dbDir.getAbsolutePath()); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { stmt.executeUpdate(createPersons); } } + } + @Test + public void testInsert() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -163,18 +180,9 @@ public class TestConvertJSONToSQL { @Test public void testInsertWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -199,18 +207,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithNullValue() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -272,18 +271,9 @@ public class TestConvertJSONToSQL { @Test public void testMultipleInserts() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -343,18 +333,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateBasedOnPrimaryKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -414,18 +395,9 @@ public class TestConvertJSONToSQL { @Test public void testUnmappedFieldBehavior() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -450,18 +422,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateBasedOnUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -523,18 +486,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -561,18 +515,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMissingFieldBasedOnCompoundUpdateKey() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -586,18 +531,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMalformedJson() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -611,18 +547,9 @@ public class TestConvertJSONToSQL { @Test public void testInsertWithMissingField() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -636,20 +563,17 @@ public class TestConvertJSONToSQL { @Test public void testInsertWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); + stmt.executeUpdate("CREATE TABLE PERSONS3 (id integer, name varchar(100), code integer, generated_key integer primary key)"); } } + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS3"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN); runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); @@ -661,20 +585,17 @@ public class TestConvertJSONToSQL { @Test public void testInsertWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); - runner.addControllerService("dbcp", service); - runner.enableControllerService(service); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); + stmt.executeUpdate("CREATE TABLE PERSONS2 (id integer, name varchar(100), code integer, generated_key integer primary key)"); } } + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); - runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); + runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS2"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.WARNING_UNMATCHED_COLUMN); runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json")); @@ -691,24 +612,15 @@ public class TestConvertJSONToSQL { out.assertAttributeEquals("sql.args.3.type", String.valueOf(java.sql.Types.INTEGER)); out.assertAttributeEquals("sql.args.3.value", "48"); - out.assertContentEquals("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)"); + out.assertContentEquals("INSERT INTO PERSONS2 (ID, NAME, CODE) VALUES (?, ?, ?)"); } // End testInsertWithMissingColumnWarning() @Test public void testInsertWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)"); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT"); @@ -733,18 +645,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -759,18 +662,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMissingColumnWarning() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -797,18 +691,9 @@ public class TestConvertJSONToSQL { @Test public void testUpdateWithMissingColumnIgnore() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class); - final File tempDir = folder.getRoot(); - final File dbDir = new File(tempDir, "db"); - final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); runner.enableControllerService(service); - - try (final Connection conn = service.getConnection()) { - try (final Statement stmt = conn.createStatement()) { - stmt.executeUpdate(createPersons); - } - } - runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp"); runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS"); runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "UPDATE"); @@ -832,6 +717,81 @@ public class TestConvertJSONToSQL { } // End testUpdateWithMissingColumnIgnore() + /** + * Test create correct SQL String representation of value. + * Use PutSQL processor to verify converted value can be used and don't fail. + */ + @Test + public void testCreateSqlStringValue() throws ProcessException, SQLException, JsonGenerationException, JsonMappingException, IOException, InitializationException { + final TestRunner putSqlRunner = TestRunners.newTestRunner(PutSQL.class); + + final AtomicInteger id = new AtomicInteger(20); + + putSqlRunner.addControllerService("dbcp", service); + putSqlRunner.enableControllerService(service); + putSqlRunner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false"); + putSqlRunner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + String tableName = "DIFTYPES"; + ObjectMapper mapper = new ObjectMapper(); + ResultSet colrs = null; + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createDifferentTypes); + } + colrs = conn.getMetaData().getColumns(null, null, tableName, "%"); + while (colrs.next()) { + final int sqlType = colrs.getInt("DATA_TYPE"); + final int colSize = colrs.getInt("COLUMN_SIZE"); + switch (sqlType) { + case Types.BOOLEAN: + String json = mapper.writeValueAsString("true"); + JsonNode fieldNode = mapper.readTree(json); + String booleanString = ConvertJSONToSQL.createSqlStringValue(fieldNode, colSize, sqlType); + assertEquals("true",booleanString); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(sqlType)); + attributes.put("sql.args.1.value", booleanString); + + byte[] data = ("INSERT INTO DIFTYPES (ID, B) VALUES (" + id.incrementAndGet() + ", ?)").getBytes(); + putSqlRunner.enqueue(data, attributes); + putSqlRunner.run(); + List<MockFlowFile> failed = putSqlRunner.getFlowFilesForRelationship(PutSQL.REL_FAILURE); + putSqlRunner.assertTransferCount(PutSQL.REL_SUCCESS, 1); + putSqlRunner.assertTransferCount(PutSQL.REL_FAILURE, 0); + putSqlRunner.clearTransferState(); + break; + + case Types.FLOAT: + case Types.DOUBLE: + json = mapper.writeValueAsString("78895654.6575"); + fieldNode = mapper.readTree(json); + String numberString = ConvertJSONToSQL.createSqlStringValue(fieldNode, colSize, sqlType); + assertEquals("78895654.6575",numberString); + + attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(sqlType)); + attributes.put("sql.args.1.value", numberString); + + data = ("INSERT INTO DIFTYPES (ID, dbl) VALUES (" + id.incrementAndGet() + ", ?)").getBytes(); + putSqlRunner.enqueue(data, attributes); + putSqlRunner.run(); + failed = putSqlRunner.getFlowFilesForRelationship(PutSQL.REL_FAILURE); + putSqlRunner.assertTransferCount(PutSQL.REL_SUCCESS, 1); + putSqlRunner.assertTransferCount(PutSQL.REL_FAILURE, 0); + putSqlRunner.clearTransferState(); + break; + + default: + break; + } + + } + + } + + } @Test public void testDelete() throws InitializationException, ProcessException, SQLException, IOException {