Repository: nifi
Updated Branches:
  refs/heads/master 03d3b3961 -> b22500d0a


NIFI-2576 Allowing PutSQL to use timestamps in epoch or string format

This closes #869.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b22500d0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b22500d0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b22500d0

Branch: refs/heads/master
Commit: b22500d0a322794aa8a3e95b570bb58e6d62d33e
Parents: 03d3b39
Author: Peter Wicks <pwi...@micron.com>
Authored: Mon Aug 15 15:50:47 2016 -0600
Committer: Bryan Bende <bbe...@apache.org>
Committed: Tue Aug 16 10:26:24 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/PutSQL.java | 21 +++++++++-
 .../nifi/processors/standard/TestPutSQL.java    | 43 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b22500d0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 5b1a048..55d3855 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -54,6 +54,8 @@ import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
@@ -154,6 +156,8 @@ public class PutSQL extends AbstractProcessor {
     private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
     private static final String FRAGMENT_COUNT_ATTR = "fragment.count";
 
+    private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$");
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -611,6 +615,8 @@ public class PutSQL extends AbstractProcessor {
                     setParameter(stmt, valueAttrName, parameterIndex, 
parameterValue, jdbcType);
                 } catch (final NumberFormatException nfe) {
                     throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
+                } catch (ParseException pe) {
+                    throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted to a 
timestamp", pe);
                 }
             }
         }
@@ -729,7 +735,7 @@ public class PutSQL extends AbstractProcessor {
      * @param jdbcType the JDBC Type of the SQL parameter to set
      * @throws SQLException if the PreparedStatement throws a SQLException 
when calling the appropriate setter
      */
-    private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType) throws SQLException {
+    private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType) throws SQLException, ParseException {
         if (parameterValue == null) {
             stmt.setNull(parameterIndex, jdbcType);
         } else {
@@ -768,7 +774,18 @@ public class PutSQL extends AbstractProcessor {
                     stmt.setTime(parameterIndex, new 
Time(Long.parseLong(parameterValue)));
                     break;
                 case Types.TIMESTAMP:
-                    stmt.setTimestamp(parameterIndex, new 
Timestamp(Long.parseLong(parameterValue)));
+                    long lTimestamp=0L;
+
+                    if(LONG_PATTERN.matcher(parameterValue).matches()){
+                        lTimestamp = Long.parseLong(parameterValue);
+                    }else {
+                        SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");
+                        java.util.Date parsedDate = 
dateFormat.parse(parameterValue);
+                        lTimestamp = parsedDate.getTime();
+                    }
+
+                    stmt.setTimestamp(parameterIndex, new 
Timestamp(lTimestamp));
+
                     break;
                 case Types.CHAR:
                 case Types.VARCHAR:

http://git-wip-us.apache.org/repos/asf/nifi/blob/b22500d0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index b5ebca5..592d329 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -28,6 +28,8 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -262,6 +264,47 @@ public class TestPutSQL {
     }
 
     @Test
+    public void testUsingTimestampValuesEpochAndString() throws 
InitializationException, ProcessException, SQLException, IOException, 
ParseException {
+        final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                stmt.executeUpdate("CREATE TABLE TIMESTAMPTESTS (id integer 
primary key, ts1 timestamp, ts2 timestamp)");
+            }
+        }
+
+        runner.addControllerService("dbcp", service);
+        runner.enableControllerService(service);
+        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+
+        final String arg2TS = "2001-01-01 23:01:01.001";
+        final String art3TS = "2002-02-02 22:02:02.002";
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
hh:mm:ss.SSS");
+        java.util.Date parsedDate = dateFormat.parse(arg2TS);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("sql.args.1.type", String.valueOf(Types.TIMESTAMP));
+        attributes.put("sql.args.1.value", 
Long.toString(parsedDate.getTime()));
+        attributes.put("sql.args.2.type", String.valueOf(Types.TIMESTAMP));
+        attributes.put("sql.args.2.value", art3TS);
+
+        runner.enqueue("INSERT INTO TIMESTAMPTESTS (ID, ts1, ts2) VALUES (1, 
?, ?)".getBytes(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+        try (final Connection conn = service.getConnection()) {
+            try (final Statement stmt = conn.createStatement()) {
+                final ResultSet rs = stmt.executeQuery("SELECT * FROM 
TIMESTAMPTESTS");
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+                assertEquals(arg2TS, rs.getString(2));
+                assertEquals(art3TS, rs.getString(3));
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
     public void testStatementsWithPreparedParameters() throws 
InitializationException, ProcessException, SQLException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
         runner.addControllerService("dbcp", service);

Reply via email to