This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f534dcc42 NIFI-12710 Support microsecond precision for Timestamp 
Record fields
5f534dcc42 is described below

commit 5f534dcc42e99a87dd8315d0d424a3b615018cd4
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed Jan 31 21:33:59 2024 -0500

    NIFI-12710 Support microsecond precision for Timestamp Record fields
    
    - PutDatabaseRecordIT supports operating systems with either nanosecond or 
microsecond precision
    
    This closes #8332
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../field/ObjectLocalDateTimeFieldConverter.java   |  67 +++++-
 .../TestObjectLocalDateTimeFieldConverter.java     |  88 +++++++
 .../nifi/util/text/RegexDateTimeMatcher.java       |   8 +-
 .../nifi-standard-processors/pom.xml               |  31 +++
 .../processors/standard/PutDatabaseRecordIT.java   | 263 +++++++++++++++++++++
 .../PutDatabaseRecordIT/create-person-table.sql    |   7 +
 6 files changed, 448 insertions(+), 16 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectLocalDateTimeFieldConverter.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectLocalDateTimeFieldConverter.java
index 860f0ebf4d..6408949537 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectLocalDateTimeFieldConverter.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectLocalDateTimeFieldConverter.java
@@ -30,6 +30,8 @@ import java.util.Optional;
  * Convert Object to java.time.LocalDateTime using instanceof evaluation and 
optional format pattern for DateTimeFormatter
  */
 class ObjectLocalDateTimeFieldConverter implements FieldConverter<Object, 
LocalDateTime> {
+    private static final long YEAR_TEN_THOUSAND = 253_402_300_800_000L;
+
     /**
      * Convert Object field to java.sql.Timestamp using optional format 
supported in DateTimeFormatter
      *
@@ -51,10 +53,13 @@ class ObjectLocalDateTimeFieldConverter implements 
FieldConverter<Object, LocalD
             final Instant instant = Instant.ofEpochMilli(date.getTime());
             return ofInstant(instant);
         }
-        if (field instanceof Number) {
-            final Number number = (Number) field;
-            final Instant instant = Instant.ofEpochMilli(number.longValue());
-            return ofInstant(instant);
+        if (field instanceof final Number number) {
+            // If value is a floating point number, we consider it as seconds 
since epoch plus a decimal part for fractions of a second.
+            if (field instanceof Double || field instanceof Float) {
+                return toLocalDateTime(number.doubleValue());
+            }
+
+            return toLocalDateTime(number.longValue());
         }
         if (field instanceof String) {
             final String string = field.toString().trim();
@@ -67,22 +72,60 @@ class ObjectLocalDateTimeFieldConverter implements 
FieldConverter<Object, LocalD
                 try {
                     return LocalDateTime.parse(string, formatter);
                 } catch (final DateTimeParseException e) {
-                    throw new FieldConversionException(LocalDateTime.class, 
field, name, e);
+                    return tryParseAsNumber(string, name);
                 }
             } else {
-                try {
-                    final long number = Long.parseLong(string);
-                    final Instant instant = Instant.ofEpochMilli(number);
-                    return ofInstant(instant);
-                } catch (final NumberFormatException e) {
-                    throw new FieldConversionException(LocalDateTime.class, 
field, name, e);
-                }
+                return tryParseAsNumber(string, name);
             }
         }
 
         throw new FieldConversionException(LocalDateTime.class, field, name);
     }
 
+    private LocalDateTime tryParseAsNumber(final String value, final String 
fieldName) {
+        try {
+            // If decimal, treat as a double and convert to seconds and 
nanoseconds.
+            if (value.contains(".")) {
+                final double number = Double.parseDouble(value);
+                return toLocalDateTime(number);
+            }
+
+            // attempt to parse as a long value
+            final long number = Long.parseLong(value);
+            return toLocalDateTime(number);
+        } catch (final NumberFormatException e) {
+            throw new FieldConversionException(LocalDateTime.class, value, 
fieldName, e);
+        }
+    }
+
+    private LocalDateTime toLocalDateTime(final double secondsSinceEpoch) {
+        // Determine the number of micros past the second by subtracting the 
number of seconds from the decimal value and multiplying by 1 million.
+        final double micros = 1_000_000 * (secondsSinceEpoch - (long) 
secondsSinceEpoch);
+        // Convert micros to nanos. Note that we perform this as a separate 
operation, rather than multiplying by 1_000,000,000 in order to avoid
+        // issues that occur with rounding at high precision.
+        final long nanos = (long) micros * 1000L;
+
+        return toLocalDateTime((long) secondsSinceEpoch, nanos);
+    }
+
+    private LocalDateTime toLocalDateTime(final long epochSeconds, final long 
nanosPastSecond) {
+        final Instant instant = 
Instant.ofEpochSecond(epochSeconds).plusNanos(nanosPastSecond);
+        return ofInstant(instant);
+    }
+
+    private LocalDateTime toLocalDateTime(final long value) {
+        if (value > YEAR_TEN_THOUSAND) {
+            // Value is too large. Assume microseconds instead of milliseconds.
+            final Instant microsInstant = Instant.ofEpochSecond(value / 
1_000_000, (value % 1_000_000) * 1_000);
+            return ofInstant(microsInstant);
+        }
+
+        final Instant instant = Instant.ofEpochMilli(value);
+        final LocalDateTime localDateTime = ofInstant(instant);
+
+        return localDateTime;
+    }
+
     private LocalDateTime ofInstant(final Instant instant) {
         return LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
     }
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/TestObjectLocalDateTimeFieldConverter.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/TestObjectLocalDateTimeFieldConverter.java
new file mode 100644
index 0000000000..d44826d45b
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/TestObjectLocalDateTimeFieldConverter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.field;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestObjectLocalDateTimeFieldConverter {
+    private static final String FIELD_NAME = "test";
+    private static final long MILLIS_TIMESTAMP_LONG = 1707238288351L;
+    private static final long MICROS_TIMESTAMP_LONG = 1707238288351567L;
+    private static final String MICROS_TIMESTAMP_STRING = 
Long.toString(MICROS_TIMESTAMP_LONG);
+    private static final double MICROS_TIMESTAMP_DOUBLE = ((double) 
MICROS_TIMESTAMP_LONG) / 1000000D;
+    private static final long NANOS_AFTER_SECOND = 351567000L;
+    private static final Instant INSTANT_MILLIS_PRECISION = 
Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG);
+    // Create an instant to represent the same time as the microsecond 
precision timestamp. We add nanoseconds after second but then have to subtract 
the milliseconds after the second that are already
+    // present in the MILLIS_TIMESTAMP_LONG value.
+    private static final Instant INSTANT_MICROS_PRECISION = 
Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(NANOS_AFTER_SECOND).minusMillis(MILLIS_TIMESTAMP_LONG
 % 1000);
+    private static final LocalDateTime LOCAL_DATE_TIME_MILLIS_PRECISION = 
LocalDateTime.ofInstant(INSTANT_MILLIS_PRECISION, ZoneId.systemDefault());
+    private static final LocalDateTime LOCAL_DATE_TIME_MICROS_PRECISION = 
LocalDateTime.ofInstant(INSTANT_MICROS_PRECISION, ZoneId.systemDefault());
+
+    private final ObjectLocalDateTimeFieldConverter converter = new 
ObjectLocalDateTimeFieldConverter();
+
+
+    @Test
+    public void testConvertTimestampMillis() {
+        final LocalDateTime result = 
converter.convertField(MILLIS_TIMESTAMP_LONG, Optional.empty(), FIELD_NAME);
+        assertEquals(LOCAL_DATE_TIME_MILLIS_PRECISION, result);
+    }
+
+    @Test
+    public void testConvertTimestampMicros() {
+        final LocalDateTime result = 
converter.convertField(MICROS_TIMESTAMP_LONG, Optional.empty(), FIELD_NAME);
+        assertEquals(MILLIS_TIMESTAMP_LONG, 
result.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+
+        final Instant resultInstant = 
result.atZone(ZoneId.systemDefault()).toInstant();
+        assertEquals(NANOS_AFTER_SECOND, resultInstant.getNano());
+    }
+
+    @Test
+    public void testDoubleAsEpochSeconds() {
+        final LocalDateTime result = 
converter.convertField(MICROS_TIMESTAMP_DOUBLE, Optional.empty(), FIELD_NAME);
+        assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
+        assertEquals(NANOS_AFTER_SECOND, result.getNano(), 1D);
+    }
+
+    @Test
+    public void testDoubleAsEpochSecondsAsString() {
+        final LocalDateTime result = 
converter.convertField(MICROS_TIMESTAMP_STRING, Optional.empty(), FIELD_NAME);
+        assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
+        final double expectedNanos = 351567000L;
+        assertEquals(expectedNanos, result.getNano(), 1D);
+    }
+
+    @Test
+    public void testWithDateFormatMillisPrecision() {
+        final long millis = System.currentTimeMillis();
+        final LocalDateTime result = converter.convertField(millis, 
Optional.of("yyyy-MM-dd'T'HH:mm:ss.SSS"), FIELD_NAME);
+        assertEquals(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), 
ZoneId.systemDefault()), result);
+    }
+
+    @Test
+    public void testWithDateFormatMicrosecondPrecision() {
+        final LocalDateTime result = 
converter.convertField(MICROS_TIMESTAMP_LONG, 
Optional.of("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"), FIELD_NAME);
+        assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
+    }
+}
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/RegexDateTimeMatcher.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/RegexDateTimeMatcher.java
index 8bd27ff1d5..c83eb46ae4 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/RegexDateTimeMatcher.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/text/RegexDateTimeMatcher.java
@@ -359,7 +359,7 @@ public class RegexDateTimeMatcher implements 
DateTimeMatcher {
                     addSecondInMinute();
                     break;
                 case 'S':
-                    addMillisecond();
+                    addSubsecond();
                     break;
                 case 'z':
                     addGeneralTimeZone();
@@ -468,9 +468,9 @@ public class RegexDateTimeMatcher implements 
DateTimeMatcher {
             range = range.plus(1, 2);
         }
 
-        private void addMillisecond() {
-            patterns.add("\\d{1,3}");
-            range = range.plus(1, 3);
+        private void addSubsecond() {
+            patterns.add("\\d{1," + charCount + "}");
+            range = range.plus(1, charCount);
         }
 
         private void addGeneralTimeZone() {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 1048db9509..0d7a1f20fb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -488,6 +488,36 @@
             <artifactId>nifi-json-schema-shared</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+
+        <!-- Test Dependencies for database processors -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>1.19.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-dbcp-service</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.7.1</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
@@ -747,6 +777,7 @@
                         
<exclude>src/test/resources/TestXml/xml-bundle-1</exclude>
                         
<exclude>src/test/resources/xxe_from_report.xml</exclude>
                         <exclude>src/test/resources/xxe_template.xml</exclude>
+                        
<exclude>src/test/resources/PutDatabaseRecordIT/create-person-table.sql</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordIT.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordIT.java
new file mode 100644
index 0000000000..1c2278e754
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordIT.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.dbcp.DBCPConnectionPool;
+import org.apache.nifi.dbcp.utils.DBCPProperties;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@SuppressWarnings("resource")
+public class PutDatabaseRecordIT {
+
+    private final long MILLIS_TIMESTAMP_LONG = 1707238288351L;
+    private final long MICROS_TIMESTAMP_LONG = 1707238288351567L;
+    private final String MICROS_TIMESTAMP_FORMATTED = "2024-02-06 
11:51:28.351567";
+    private final double MICROS_TIMESTAMP_DOUBLE = ((double) 
MICROS_TIMESTAMP_LONG) / 1000000D;
+    private final long NANOS_AFTER_SECOND = 351567000L;
+    private final Instant INSTANT_MICROS_PRECISION = 
Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(NANOS_AFTER_SECOND).minusMillis(MILLIS_TIMESTAMP_LONG
 % 1000);
+
+
+    private static PostgreSQLContainer<?> postgres;
+    private TestRunner runner;
+
+
+    @BeforeAll
+    public static void startPostgres() {
+        postgres = new PostgreSQLContainer<>("postgres:9.6.12")
+            .withInitScript("PutDatabaseRecordIT/create-person-table.sql");
+        postgres.start();
+    }
+
+    @AfterAll
+    public static void cleanup() {
+        if (postgres != null) {
+            postgres.close();
+            postgres = null;
+        }
+    }
+
+    @BeforeEach
+    public void setup() throws InitializationException, SQLException {
+        truncateTable();
+
+        runner = TestRunners.newTestRunner(PutDatabaseRecord.class);
+        final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
+        runner.addControllerService("connectionPool", connectionPool);
+        runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL, 
postgres.getJdbcUrl());
+        runner.setProperty(connectionPool, DBCPProperties.DB_USER, 
postgres.getUsername());
+        runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD, 
postgres.getPassword());
+        runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, 
postgres.getDriverClassName());
+        runner.enableControllerService(connectionPool);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("json-reader", jsonReader);
+        runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT, 
"yyyy-MM-dd");
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, 
"yyyy-MM-dd HH:mm:ss.SSSSSS");
+        runner.enableControllerService(jsonReader);
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 
"json-reader");
+        runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "connectionPool");
+
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, "person");
+        runner.setProperty(PutDatabaseRecord.DB_TYPE, "PostgreSQL");
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, "INSERT");
+    }
+
+
+    @Test
+    public void testSimplePut() throws SQLException {
+        runner.enqueue("""
+            {
+              "name": "John Doe",
+              "age": 50,
+              "favorite_color": "blue"
+            }
+            """);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+        final Map<String, Object> results = getResults();
+        assertEquals("blue", results.get("favorite_color"));
+    }
+
+    @Test
+    public void testWithDate() throws SQLException {
+        runner.enqueue("""
+            {
+              "name": "John Doe",
+              "age": 50,
+              "dob": "1975-01-01"
+            }
+            """);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+        final Map<String, Object> results = getResults();
+        final Date dob = (Date) results.get("dob");
+        assertEquals(1975, dob.toLocalDate().getYear());
+        assertEquals(Month.JANUARY, dob.toLocalDate().getMonth());
+        assertEquals(1, dob.toLocalDate().getDayOfMonth());
+    }
+
+    @Test
+    public void testWithTimestampUsingMillis() throws SQLException {
+        runner.enqueue(createJson(MILLIS_TIMESTAMP_LONG));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+        final Map<String, Object> results = getResults();
+        assertEquals(new Timestamp(MILLIS_TIMESTAMP_LONG), 
results.get("lasttransactiontime"));
+    }
+
+    @Test
+    public void testWithTimestampUsingMillisAsString() throws SQLException {
+        runner.enqueue(createJson(MILLIS_TIMESTAMP_LONG));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+        final Map<String, Object> results = getResults();
+        assertEquals(new Timestamp(MILLIS_TIMESTAMP_LONG), 
results.get("lasttransactiontime"));
+    }
+
+    @Test
+    public void testWithStringTimestampUsingMicros() throws SQLException {
+        runner.enqueue(createJson(MICROS_TIMESTAMP_FORMATTED));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+        final Map<String, Object> results = getResults();
+        final Timestamp lastTransactionTime = (Timestamp) 
results.get("lasttransactiontime");
+        final LocalDateTime transactionLocalTime = 
lastTransactionTime.toLocalDateTime();
+        assertEquals(2024, transactionLocalTime.getYear());
+        assertEquals(Month.FEBRUARY, transactionLocalTime.getMonth());
+        assertEquals(6, transactionLocalTime.getDayOfMonth());
+        assertEquals(11, transactionLocalTime.getHour());
+        assertEquals(51, transactionLocalTime.getMinute());
+        assertEquals(28, transactionLocalTime.getSecond());
+        assertEquals(351567000, transactionLocalTime.getNano());
+    }
+
+    @Test
+    public void testWithNumericTimestampUsingMicros() throws SQLException {
+        runner.enqueue(createJson(MICROS_TIMESTAMP_LONG));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+        final Map<String, Object> results = getResults();
+        final Timestamp lastTransactionTime = (Timestamp) 
results.get("lasttransactiontime");
+        assertEquals(INSTANT_MICROS_PRECISION, 
lastTransactionTime.toInstant());
+    }
+
+
+    @Test
+    public void testWithDecimalTimestampUsingMicros() throws SQLException {
+        runner.enqueue(createJson(Double.toString(MICROS_TIMESTAMP_DOUBLE)));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+        final Map<String, Object> results = getResults();
+        final Timestamp lastTransactionTime = (Timestamp) 
results.get("lasttransactiontime");
+        assertEquals(INSTANT_MICROS_PRECISION, 
lastTransactionTime.toInstant());
+    }
+
+    @Test
+    public void testWithDecimalTimestampUsingMicrosAsString() throws 
SQLException {
+        runner.enqueue(createJson(Double.toString(MICROS_TIMESTAMP_DOUBLE)));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
+
+        final Map<String, Object> results = getResults();
+        final Timestamp lastTransactionTime = (Timestamp) 
results.get("lasttransactiontime");
+        assertEquals(INSTANT_MICROS_PRECISION, 
lastTransactionTime.toInstant());
+    }
+
+
+    private static void truncateTable() throws SQLException {
+        try (final Connection connection = 
DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), 
postgres.getPassword())) {
+            final String sqlQuery = "TRUNCATE TABLE person";
+            try (final PreparedStatement preparedStatement = 
connection.prepareStatement(sqlQuery)) {
+                preparedStatement.execute();
+            }
+        }
+    }
+
+    private Map<String, Object> getResults() throws SQLException {
+        try (final Connection connection = 
DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), 
postgres.getPassword())) {
+            final String sqlQuery = "SELECT * FROM person";
+            final Map<String, Object> resultsMap = new HashMap<>();
+
+            try (final PreparedStatement preparedStatement = 
connection.prepareStatement(sqlQuery);
+                 final ResultSet resultSet = preparedStatement.executeQuery()) 
{
+
+                final ResultSetMetaData metaData = resultSet.getMetaData();
+                final int columnCount = metaData.getColumnCount();
+
+                while (resultSet.next()) {
+                    for (int i = 1; i <= columnCount; i++) {
+                        final String columnName = metaData.getColumnName(i);
+                        final Object columnValue = resultSet.getObject(i);
+                        resultsMap.put(columnName, columnValue);
+                    }
+                }
+            }
+
+            assertEquals("John Doe", resultsMap.get("name"));
+            assertEquals(50, resultsMap.get("age"));
+
+            return resultsMap;
+        }
+    }
+
+    private String createJson(final long lastTransactionTime) {
+        return createJson(Long.toString(lastTransactionTime));
+    }
+
+    private String createJson(final String lastTransactionTime) {
+        return """
+            {
+              "name": "John Doe",
+              "age": 50,
+              "lastTransactionTime": "%s"
+            }""".formatted(lastTransactionTime);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/PutDatabaseRecordIT/create-person-table.sql
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/PutDatabaseRecordIT/create-person-table.sql
new file mode 100644
index 0000000000..eed14bb75e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/PutDatabaseRecordIT/create-person-table.sql
@@ -0,0 +1,7 @@
+CREATE TABLE person (
+    name VARCHAR(255) NOT NULL,
+    age INT,
+    favorite_color VARCHAR(255),
+    dob DATE,
+    lastTransactionTime TIMESTAMP WITH TIME ZONE
+);

Reply via email to