Repository: nifi Updated Branches: refs/heads/master 03d3b3961 -> b22500d0a
NIFI-2576 Allowing PutSQL to use timestamps in epoch or string format This closes #869. Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b22500d0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b22500d0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b22500d0 Branch: refs/heads/master Commit: b22500d0a322794aa8a3e95b570bb58e6d62d33e Parents: 03d3b39 Author: Peter Wicks <pwi...@micron.com> Authored: Mon Aug 15 15:50:47 2016 -0600 Committer: Bryan Bende <bbe...@apache.org> Committed: Tue Aug 16 10:26:24 2016 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/standard/PutSQL.java | 21 +++++++++- .../nifi/processors/standard/TestPutSQL.java | 43 ++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b22500d0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 5b1a048..55d3855 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -54,6 +54,8 @@ import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; @@ -154,6 +156,8 @@ public class PutSQL extends AbstractProcessor { private static final String FRAGMENT_INDEX_ATTR = "fragment.index"; private static final String FRAGMENT_COUNT_ATTR = "fragment.count"; + private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$"); + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(); @@ -611,6 +615,8 @@ public class PutSQL extends AbstractProcessor { setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType); } catch (final NumberFormatException nfe) { throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); + } catch (ParseException pe) { + throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); } } } @@ -729,7 +735,7 @@ public class PutSQL extends AbstractProcessor { * @param jdbcType the JDBC Type of the SQL parameter to set * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter */ - private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException { + private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType) throws SQLException, ParseException { if (parameterValue == null) { stmt.setNull(parameterIndex, jdbcType); } else { @@ -768,7 +774,18 @@ public class PutSQL extends AbstractProcessor { stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); break; case Types.TIMESTAMP: - stmt.setTimestamp(parameterIndex, new Timestamp(Long.parseLong(parameterValue))); + long lTimestamp=0L; + + if(LONG_PATTERN.matcher(parameterValue).matches()){ + lTimestamp = Long.parseLong(parameterValue); + }else { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS"); + java.util.Date parsedDate = dateFormat.parse(parameterValue); + lTimestamp = parsedDate.getTime(); + } + + stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp)); + break; case Types.CHAR: case Types.VARCHAR: http://git-wip-us.apache.org/repos/asf/nifi/blob/b22500d0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index b5ebca5..592d329 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -28,6 +28,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; @@ -262,6 +264,47 @@ public class TestPutSQL { } @Test + public void testUsingTimestampValuesEpochAndString() throws InitializationException, ProcessException, SQLException, IOException, ParseException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE TIMESTAMPTESTS (id integer primary key, ts1 timestamp, ts2 timestamp)"); + } + } + + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + + final String arg2TS = "2001-01-01 23:01:01.001"; + final String art3TS = "2002-02-02 22:02:02.002"; + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS"); + java.util.Date parsedDate = dateFormat.parse(arg2TS); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP)); + attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime())); + attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP)); + attributes.put("sql.args.2.value", art3TS); + + runner.enqueue("INSERT INTO TIMESTAMPTESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTESTS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(arg2TS, rs.getString(2)); + assertEquals(art3TS, rs.getString(3)); + assertFalse(rs.next()); + } + } + } + + @Test public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException { final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); runner.addControllerService("dbcp", service);