Repository: nifi Updated Branches: refs/heads/master 219234d00 -> 557d6365b
NIFI-2518: Added unit test showing issue NIFI-2518: Added support for fractional seconds to AbstractDatabaseFetchProcessor This closes #821 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/557d6365 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/557d6365 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/557d6365 Branch: refs/heads/master Commit: 557d6365bf27ee2271cd9d3a147361acb6cc9c6c Parents: 219234d Author: Matt Burgess <mattyb...@apache.org> Authored: Tue Aug 9 10:18:26 2016 -0400 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Wed Aug 10 11:10:17 2016 -0400 ---------------------------------------------------------------------- .../AbstractDatabaseFetchProcessor.java | 21 +++++--- .../standard/QueryDatabaseTableTest.java | 52 +++++++++++++++++++- 2 files changed, 65 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/557d6365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 6182d93..7b30479 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -35,6 +35,7 @@ import java.sql.Statement; import java.sql.Timestamp; import java.text.DecimalFormat; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -135,6 +136,8 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>(); protected final Map<String, Integer> columnTypeMap = new HashMap<>(); + private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); + static { // Load the DatabaseAdapters ServiceLoader<DatabaseAdapter> dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class); @@ -275,14 +278,19 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact break; case TIME: - Date rawColTimeValue = resultSet.getDate(columnIndex); - java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime()); - java.sql.Time maxTimeValue = null; + // Compare milliseconds-since-epoch. Need getTimestamp() instead of getTime() since some databases + // don't return milliseconds in the Time returned by getTime(). + Date colTimeValue = new Date(resultSet.getTimestamp(columnIndex).getTime()); + Date maxTimeValue = null; if (maxValueString != null) { - maxTimeValue = java.sql.Time.valueOf(maxValueString); + try { + maxTimeValue = TIME_TYPE_FORMAT.parse(maxValueString); + } catch (ParseException pe) { + // Shouldn't happen, but just in case, leave the value as null so the new value will be stored + } } if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) { - return colTimeValue.toString(); + return TIME_TYPE_FORMAT.format(colTimeValue); } break; @@ -299,8 +307,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact return oracleTimestampValue.toString(); } } else { - Timestamp rawColTimestampValue = resultSet.getTimestamp(columnIndex); - java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime()); + Timestamp colTimestampValue = resultSet.getTimestamp(columnIndex); java.sql.Timestamp maxTimestampValue = null; if (maxValueString != null) { maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString); http://git-wip-us.apache.org/repos/asf/nifi/blob/557d6365/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 5aec899..a373f8f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -278,8 +278,58 @@ public class QueryDatabaseTableTest { runner.clearTransferState(); } - @Test + public void testTimestampNanos() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.000123456')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add a new row with a lower timestamp (but same millisecond value), no flow file should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.000')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add a new row with a higher timestamp, one flow file should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.0003')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + } + + @Test public void testWithNullIntColumn() throws SQLException { // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();