Repository: nifi
Updated Branches:
  refs/heads/master 219234d00 -> 557d6365b


NIFI-2518: Added unit test showing issue

NIFI-2518: Added support for fractional seconds to 
AbstractDatabaseFetchProcessor

This closes #821


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/557d6365
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/557d6365
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/557d6365

Branch: refs/heads/master
Commit: 557d6365bf27ee2271cd9d3a147361acb6cc9c6c
Parents: 219234d
Author: Matt Burgess <mattyb...@apache.org>
Authored: Tue Aug 9 10:18:26 2016 -0400
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Wed Aug 10 11:10:17 2016 -0400

----------------------------------------------------------------------
 .../AbstractDatabaseFetchProcessor.java         | 21 +++++---
 .../standard/QueryDatabaseTableTest.java        | 52 +++++++++++++++++++-
 2 files changed, 65 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/557d6365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 6182d93..7b30479 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -35,6 +35,7 @@ import java.sql.Statement;
 import java.sql.Timestamp;
 import java.text.DecimalFormat;
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -135,6 +136,8 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
     protected final static Map<String, DatabaseAdapter> dbAdapters = new 
HashMap<>();
     protected final Map<String, Integer> columnTypeMap = new HashMap<>();
 
+    private static SimpleDateFormat TIME_TYPE_FORMAT = new 
SimpleDateFormat("HH:mm:ss.SSS");
+
     static {
         // Load the DatabaseAdapters
         ServiceLoader<DatabaseAdapter> dbAdapterLoader = 
ServiceLoader.load(DatabaseAdapter.class);
@@ -275,14 +278,19 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                 break;
 
             case TIME:
-                Date rawColTimeValue = resultSet.getDate(columnIndex);
-                java.sql.Time colTimeValue = new 
java.sql.Time(rawColTimeValue.getTime());
-                java.sql.Time maxTimeValue = null;
+                // Compare milliseconds-since-epoch. Need getTimestamp() 
instead of getTime() since some databases
+                // don't return milliseconds in the Time returned by getTime().
+                Date colTimeValue = new 
Date(resultSet.getTimestamp(columnIndex).getTime());
+                Date maxTimeValue = null;
                 if (maxValueString != null) {
-                    maxTimeValue = java.sql.Time.valueOf(maxValueString);
+                    try {
+                        maxTimeValue = TIME_TYPE_FORMAT.parse(maxValueString);
+                    } catch (ParseException pe) {
+                        // Shouldn't happen, but just in case, leave the value 
as null so the new value will be stored
+                    }
                 }
                 if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) {
-                    return colTimeValue.toString();
+                    return TIME_TYPE_FORMAT.format(colTimeValue);
                 }
                 break;
 
@@ -299,8 +307,7 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
                         return oracleTimestampValue.toString();
                     }
                 } else {
-                    Timestamp rawColTimestampValue = 
resultSet.getTimestamp(columnIndex);
-                    java.sql.Timestamp colTimestampValue = new 
java.sql.Timestamp(rawColTimestampValue.getTime());
+                    Timestamp colTimestampValue = 
resultSet.getTimestamp(columnIndex);
                     java.sql.Timestamp maxTimestampValue = null;
                     if (maxValueString != null) {
                         maxTimestampValue = 
java.sql.Timestamp.valueOf(maxValueString);

http://git-wip-us.apache.org/repos/asf/nifi/blob/557d6365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 5aec899..a373f8f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -278,8 +278,58 @@ public class QueryDatabaseTableTest {
         runner.clearTransferState();
     }
 
-
     @Test
+    public void testTimestampNanos() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.000123456')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, 
"created_on");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        InputStream in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        // Add a new row with a lower timestamp (but same millisecond value), 
no flow file should be transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.000')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        // Add a new row with a higher timestamp, one flow file should be 
transferred
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.0003')");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
+        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+        runner.clearTransferState();
+
+        // Run again, this time no flowfiles/rows should be transferred
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+    }
+
+        @Test
     public void testWithNullIntColumn() throws SQLException {
         // load test data to database
         final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();

Reply via email to