This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ca43615  NIFI-7714: QueryCassandra loses precision when converting 
timestamps to JSON
ca43615 is described below

commit ca43615702eabddc6f7bbf280f0d181d7aa0b0d4
Author: Denes Arvay <de...@apache.org>
AuthorDate: Fri Aug 7 14:27:40 2020 +0200

    NIFI-7714: QueryCassandra loses precision when converting timestamps to JSON
    
    Updated the patch based on @tpalfy's review
    Updated the patch based on @mattyb149's review
    Rename DATE_FORMAT_PATTERN to JSON_TIMESTAMP_FORMAT_PATTERN
    Changed convertToJsonStream method's visibility to package private.
    Removed json prefix from timestamp-format-pattern property to make it more 
generic
    
    This closes #4463.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../nifi/processors/cassandra/QueryCassandra.java  | 58 ++++++++++++++++++----
 .../cassandra/CassandraQueryTestUtil.java          | 32 ++++++++++++
 .../processors/cassandra/QueryCassandraTest.java   | 48 ++++++++++++++++++
 3 files changed, 129 insertions(+), 9 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index ebad736..6212082 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -25,6 +25,7 @@ import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.QueryExecutionException;
 import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileWriter;
@@ -39,6 +40,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -68,6 +70,7 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ExecutionException;
@@ -130,6 +133,24 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
             .defaultValue(AVRO_FORMAT)
             .build();
 
+    public static final PropertyDescriptor TIMESTAMP_FORMAT_PATTERN = new 
PropertyDescriptor.Builder()
+            .name("timestamp-format-pattern")
+            .displayName("Timestamp Format Pattern for JSON output")
+            .description("Pattern to use when converting timestamp fields to 
JSON. Note: the formatted timestamp will be in UTC timezone.")
+            .required(true)
+            .defaultValue("yyyy-MM-dd HH:mm:ssZ")
+            .addValidator((subject, input, context) -> {
+                final ValidationResult.Builder vrb = new 
ValidationResult.Builder().subject(subject).input(input);
+                try {
+                    new SimpleDateFormat(input).format(new Date());
+                    vrb.valid(true).explanation("Valid date format pattern");
+                } catch (Exception ex) {
+                    vrb.valid(false).explanation("the pattern is invalid: " + 
ex.getMessage());
+                }
+                return vrb.build();
+            })
+            .build();
+
     private final static List<PropertyDescriptor> propertyDescriptors;
 
     private final static Set<Relationship> relationships;
@@ -145,6 +166,7 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
         _propertyDescriptors.add(QUERY_TIMEOUT);
         _propertyDescriptors.add(FETCH_SIZE);
         _propertyDescriptors.add(OUTPUT_FORMAT);
+        _propertyDescriptors.add(TIMESTAMP_FORMAT_PATTERN);
         propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -220,14 +242,14 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
                             if (AVRO_FORMAT.equals(outputFormat)) {
                                 nrOfRows.set(convertToAvroStream(resultSet, 
out, queryTimeout, TimeUnit.MILLISECONDS));
                             } else if (JSON_FORMAT.equals(outputFormat)) {
-                                nrOfRows.set(convertToJsonStream(resultSet, 
out, charset, queryTimeout, TimeUnit.MILLISECONDS));
+                                
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 
queryTimeout, TimeUnit.MILLISECONDS));
                             }
                         } else {
                             resultSet = queryFuture.getUninterruptibly();
                             if (AVRO_FORMAT.equals(outputFormat)) {
                                 nrOfRows.set(convertToAvroStream(resultSet, 
out, 0, null));
                             } else if (JSON_FORMAT.equals(outputFormat)) {
-                                nrOfRows.set(convertToJsonStream(resultSet, 
out, charset, 0, null));
+                                
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 
0, null));
                             }
                         }
 
@@ -381,6 +403,13 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
      */
     public static long convertToJsonStream(final ResultSet rs, final 
OutputStream outStream,
                                            Charset charset, long timeout, 
TimeUnit timeUnit)
+        throws IOException, InterruptedException, TimeoutException, 
ExecutionException {
+        return convertToJsonStream(Optional.empty(), rs, outStream, charset, 
timeout, timeUnit);
+    }
+
+    @VisibleForTesting
+    static long convertToJsonStream(final Optional<ProcessContext> context, 
final ResultSet rs, final OutputStream outStream,
+                                           Charset charset, long timeout, 
TimeUnit timeUnit)
             throws IOException, InterruptedException, TimeoutException, 
ExecutionException {
 
         try {
@@ -425,7 +454,7 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
                                         if (!first) {
                                             sb.append(",");
                                         }
-                                        sb.append(getJsonElement(element));
+                                        sb.append(getJsonElement(context, 
element));
                                         first = false;
                                     }
                                     sb.append("]");
@@ -441,15 +470,15 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
                                         if (!first) {
                                             sb.append(",");
                                         }
-                                        sb.append(getJsonElement(mapKey));
+                                        sb.append(getJsonElement(context, 
mapKey));
                                         sb.append(":");
-                                        sb.append(getJsonElement(mapValue));
+                                        sb.append(getJsonElement(context, 
mapValue));
                                         first = false;
                                     }
                                     sb.append("}");
                                     valueString = sb.toString();
                                 } else {
-                                    valueString = getJsonElement(value);
+                                    valueString = getJsonElement(context, 
value);
                                 }
                                 outStream.write(("\"" + colName + "\":"
                                         + valueString + "").getBytes(charset));
@@ -467,12 +496,14 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
     }
 
     protected static String getJsonElement(Object value) {
+        return getJsonElement(Optional.empty(), value);
+    }
+
+    protected static String getJsonElement(final Optional<ProcessContext> 
context, Object value) {
         if (value instanceof Number) {
             return value.toString();
         } else if (value instanceof Date) {
-            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ssZ");
-            dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-            return "\"" + dateFormat.format((Date) value) + "\"";
+            return "\"" + getFormattedDate(context, (Date) value) + "\"";
         } else if (value instanceof String) {
             return "\"" + StringEscapeUtils.escapeJson((String) value) + "\"";
         } else {
@@ -480,6 +511,15 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
         }
     }
 
+    private static String getFormattedDate(final Optional<ProcessContext> 
context, Date value) {
+        final String dateFormatPattern = context
+                .map(_context -> 
_context.getProperty(TIMESTAMP_FORMAT_PATTERN).getValue())
+                .orElse(TIMESTAMP_FORMAT_PATTERN.getDefaultValue());
+        SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        return dateFormat.format(value);
+    }
+
     /**
      * Creates an Avro schema from the given result set. The metadata (column 
definitions, data types, etc.) is used
      * to determine a schema for Avro.
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
index ab85e9f..d5e5a08 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
@@ -27,8 +27,10 @@ import org.mockito.stubbing.Answer;
 
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
+import java.util.GregorianCalendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +47,15 @@ import static org.mockito.Mockito.when;
  * Utility methods for Cassandra processors' unit tests
  */
 public class CassandraQueryTestUtil {
+
+    static final Date TEST_DATE;
+    static {
+        Calendar c = 
GregorianCalendar.getInstance(TimeZone.getTimeZone("PST"));
+        c.set(2020, Calendar.JANUARY, 1, 10, 10, 10);
+        c.set(Calendar.MILLISECOND, 10);
+        TEST_DATE = c.getTime();
+    }
+
     public static ResultSet createMockResultSet() throws Exception {
         ResultSet resultSet = mock(ResultSet.class);
         ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
@@ -140,6 +151,27 @@ public class CassandraQueryTestUtil {
         return resultSet;
     }
 
+    public static ResultSet createMockDateResultSet() throws Exception {
+        ResultSet resultSet = mock(ResultSet.class);
+        ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
+
+        when(columnDefinitions.size()).thenReturn(1);
+        when(columnDefinitions.getName(anyInt())).thenReturn("date");
+        when(columnDefinitions.getTable(0)).thenReturn("users");
+        
when(columnDefinitions.getType(anyInt())).thenReturn(DataType.timestamp());
+
+        Row row = mock(Row.class);
+        when(row.getTimestamp(0)).thenReturn(TEST_DATE);
+        List<Row> rows = Collections.singletonList(row);
+
+        when(resultSet.iterator()).thenReturn(rows.iterator());
+        when(resultSet.all()).thenReturn(rows);
+        when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size());
+        when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true);
+        when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions);
+        return resultSet;
+    }
+
     public static Row createRow(String user_id, String first_name, String 
last_name, Set<String> emails,
                                 List<String> top_places, Map<Date, String> 
todo, boolean registered,
                                 float scale, double metric) {
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index dd6301f..330b776 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -39,16 +39,23 @@ import 
com.datastax.driver.core.exceptions.ReadTimeoutException;
 import java.io.ByteArrayOutputStream;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.net.ssl.SSLContext;
 import org.apache.avro.Schema;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -75,6 +82,11 @@ public class QueryCassandraTest {
         testRunner.assertNotValid();
         testRunner.setProperty(AbstractCassandraProcessor.USERNAME, 
"username");
         testRunner.assertValid();
+
+        testRunner.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, 
"invalid format");
+        testRunner.assertNotValid();
+        testRunner.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, 
"yyyy-MM-dd HH:mm:ss.SSSZ");
+        testRunner.assertValid();
     }
 
     @Test
@@ -368,6 +380,42 @@ public class QueryCassandraTest {
         assertEquals(2, numberOfRows);
     }
 
+    @Test
+    public void testDefaultDateFormatInConvertToJSONStream() throws Exception {
+        ResultSet rs = CassandraQueryTestUtil.createMockDateResultSet();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        DateFormat df = new 
SimpleDateFormat(QueryCassandra.TIMESTAMP_FORMAT_PATTERN.getDefaultValue());
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        long numberOfRows = 
QueryCassandra.convertToJsonStream(Optional.of(testRunner.getProcessContext()), 
rs, baos,
+            StandardCharsets.UTF_8, 0, null);
+        assertEquals(1, numberOfRows);
+
+        Map<String, List<Map<String, String>>> map = new 
ObjectMapper().readValue(baos.toByteArray(), HashMap.class);
+        String date = map.get("results").get(0).get("date");
+        assertEquals(df.format(CassandraQueryTestUtil.TEST_DATE), date);
+    }
+
+    @Test
+    public void testCustomDateFormatInConvertToJSONStream() throws Exception {
+        MockProcessContext context = (MockProcessContext) 
testRunner.getProcessContext();
+        ResultSet rs = CassandraQueryTestUtil.createMockDateResultSet();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final String customDateFormat = "yyyy-MM-dd HH:mm:ss.SSSZ";
+        context.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, 
customDateFormat);
+        DateFormat df = new SimpleDateFormat(customDateFormat);
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        long numberOfRows = 
QueryCassandra.convertToJsonStream(Optional.of(context), rs, baos, 
StandardCharsets.UTF_8, 0, null);
+        assertEquals(1, numberOfRows);
+
+        Map<String, List<Map<String, String>>> map = new 
ObjectMapper().readValue(baos.toByteArray(), HashMap.class);
+        String date = map.get("results").get(0).get("date");
+        assertEquals(df.format(CassandraQueryTestUtil.TEST_DATE), date);
+    }
+
     private void setUpStandardProcessorConfig() {
         testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, 
"ONE");
         testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"localhost:9042");

Reply via email to