This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new ca43615 NIFI-7714: QueryCassandra loses precision when converting timestamps to JSON ca43615 is described below commit ca43615702eabddc6f7bbf280f0d181d7aa0b0d4 Author: Denes Arvay <de...@apache.org> AuthorDate: Fri Aug 7 14:27:40 2020 +0200 NIFI-7714: QueryCassandra loses precision when converting timestamps to JSON Updated the patch based on @tpalfy's review Updated the patch based on @mattyb149's review Rename DATE_FORMAT_PATTERN to JSON_TIMESTAMP_FORMAT_PATTERN Changed convertToJsonStream method's visibility to package private. Removed json prefix from timestamp-format-pattern property to make it more generic This closes #4463. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi/processors/cassandra/QueryCassandra.java | 58 ++++++++++++++++++---- .../cassandra/CassandraQueryTestUtil.java | 32 ++++++++++++ .../processors/cassandra/QueryCassandraTest.java | 48 ++++++++++++++++++ 3 files changed, 129 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java index ebad736..6212082 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java @@ -25,6 +25,7 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.exceptions.QueryExecutionException; import com.datastax.driver.core.exceptions.QueryValidationException; +import com.google.common.annotations.VisibleForTesting; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileWriter; @@ -39,6 +40,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -68,6 +70,7 @@ import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutionException; @@ -130,6 +133,24 @@ public class QueryCassandra extends AbstractCassandraProcessor { .defaultValue(AVRO_FORMAT) .build(); + public static final PropertyDescriptor TIMESTAMP_FORMAT_PATTERN = new PropertyDescriptor.Builder() + .name("timestamp-format-pattern") + .displayName("Timestamp Format Pattern for JSON output") + .description("Pattern to use when converting timestamp fields to JSON. Note: the formatted timestamp will be in UTC timezone.") + .required(true) + .defaultValue("yyyy-MM-dd HH:mm:ssZ") + .addValidator((subject, input, context) -> { + final ValidationResult.Builder vrb = new ValidationResult.Builder().subject(subject).input(input); + try { + new SimpleDateFormat(input).format(new Date()); + vrb.valid(true).explanation("Valid date format pattern"); + } catch (Exception ex) { + vrb.valid(false).explanation("the pattern is invalid: " + ex.getMessage()); + } + return vrb.build(); + }) + .build(); + private final static List<PropertyDescriptor> propertyDescriptors; private final static Set<Relationship> relationships; @@ -145,6 +166,7 @@ public class QueryCassandra extends AbstractCassandraProcessor { _propertyDescriptors.add(QUERY_TIMEOUT); _propertyDescriptors.add(FETCH_SIZE); _propertyDescriptors.add(OUTPUT_FORMAT); + _propertyDescriptors.add(TIMESTAMP_FORMAT_PATTERN); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set<Relationship> _relationships = new HashSet<>(); @@ -220,14 +242,14 @@ public class QueryCassandra extends AbstractCassandraProcessor { if (AVRO_FORMAT.equals(outputFormat)) { nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); + nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); } } else { resultSet = queryFuture.getUninterruptibly(); if (AVRO_FORMAT.equals(outputFormat)) { nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null)); + nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 0, null)); } } @@ -381,6 +403,13 @@ public class QueryCassandra extends AbstractCassandraProcessor { */ public static long convertToJsonStream(final ResultSet rs, final OutputStream outStream, Charset charset, long timeout, TimeUnit timeUnit) + throws IOException, InterruptedException, TimeoutException, ExecutionException { + return convertToJsonStream(Optional.empty(), rs, outStream, charset, timeout, timeUnit); + } + + @VisibleForTesting + static long convertToJsonStream(final Optional<ProcessContext> context, final ResultSet rs, final OutputStream outStream, + Charset charset, long timeout, TimeUnit timeUnit) throws IOException, InterruptedException, TimeoutException, ExecutionException { try { @@ -425,7 +454,7 @@ public class QueryCassandra extends AbstractCassandraProcessor { if (!first) { sb.append(","); } - sb.append(getJsonElement(element)); + sb.append(getJsonElement(context, element)); first = false; } sb.append("]"); @@ -441,15 +470,15 @@ public class QueryCassandra extends AbstractCassandraProcessor { if (!first) { sb.append(","); } - sb.append(getJsonElement(mapKey)); + sb.append(getJsonElement(context, mapKey)); sb.append(":"); - sb.append(getJsonElement(mapValue)); + sb.append(getJsonElement(context, mapValue)); first = false; } sb.append("}"); valueString = sb.toString(); } else { - valueString = getJsonElement(value); + valueString = getJsonElement(context, value); } outStream.write(("\"" + colName + "\":" + valueString + "").getBytes(charset)); @@ -467,12 +496,14 @@ public class QueryCassandra extends AbstractCassandraProcessor { } protected static String getJsonElement(Object value) { + return getJsonElement(Optional.empty(), value); + } + + protected static String getJsonElement(final Optional<ProcessContext> context, Object value) { if (value instanceof Number) { return value.toString(); } else if (value instanceof Date) { - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssZ"); - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - return "\"" + dateFormat.format((Date) value) + "\""; + return "\"" + getFormattedDate(context, (Date) value) + "\""; } else if (value instanceof String) { return "\"" + StringEscapeUtils.escapeJson((String) value) + "\""; } else { @@ -480,6 +511,15 @@ public class QueryCassandra extends AbstractCassandraProcessor { } } + private static String getFormattedDate(final Optional<ProcessContext> context, Date value) { + final String dateFormatPattern = context + .map(_context -> _context.getProperty(TIMESTAMP_FORMAT_PATTERN).getValue()) + .orElse(TIMESTAMP_FORMAT_PATTERN.getDefaultValue()); + SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + return dateFormat.format(value); + } + /** * Creates an Avro schema from the given result set. The metadata (column definitions, data types, etc.) is used * to determine a schema for Avro. diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java index ab85e9f..d5e5a08 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java @@ -27,8 +27,10 @@ import org.mockito.stubbing.Answer; import java.text.SimpleDateFormat; import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; import java.util.Date; +import java.util.GregorianCalendar; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,6 +47,15 @@ import static org.mockito.Mockito.when; * Utility methods for Cassandra processors' unit tests */ public class CassandraQueryTestUtil { + + static final Date TEST_DATE; + static { + Calendar c = GregorianCalendar.getInstance(TimeZone.getTimeZone("PST")); + c.set(2020, Calendar.JANUARY, 1, 10, 10, 10); + c.set(Calendar.MILLISECOND, 10); + TEST_DATE = c.getTime(); + } + public static ResultSet createMockResultSet() throws Exception { ResultSet resultSet = mock(ResultSet.class); ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class); @@ -140,6 +151,27 @@ public class CassandraQueryTestUtil { return resultSet; } + public static ResultSet createMockDateResultSet() throws Exception { + ResultSet resultSet = mock(ResultSet.class); + ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class); + + when(columnDefinitions.size()).thenReturn(1); + when(columnDefinitions.getName(anyInt())).thenReturn("date"); + when(columnDefinitions.getTable(0)).thenReturn("users"); + when(columnDefinitions.getType(anyInt())).thenReturn(DataType.timestamp()); + + Row row = mock(Row.class); + when(row.getTimestamp(0)).thenReturn(TEST_DATE); + List<Row> rows = Collections.singletonList(row); + + when(resultSet.iterator()).thenReturn(rows.iterator()); + when(resultSet.all()).thenReturn(rows); + when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size()); + when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true); + when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions); + return resultSet; + } + public static Row createRow(String user_id, String first_name, String last_name, Set<String> emails, List<String> top_places, Map<Date, String> todo, boolean registered, float scale, double metric) { diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java index dd6301f..330b776 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java @@ -39,16 +39,23 @@ import com.datastax.driver.core.exceptions.ReadTimeoutException; import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.net.ssl.SSLContext; import org.apache.avro.Schema; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.Before; import org.junit.Test; @@ -75,6 +82,11 @@ public class QueryCassandraTest { testRunner.assertNotValid(); testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username"); testRunner.assertValid(); + + testRunner.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, "invalid format"); + testRunner.assertNotValid(); + testRunner.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, "yyyy-MM-dd HH:mm:ss.SSSZ"); + testRunner.assertValid(); } @Test @@ -368,6 +380,42 @@ public class QueryCassandraTest { assertEquals(2, numberOfRows); } + @Test + public void testDefaultDateFormatInConvertToJSONStream() throws Exception { + ResultSet rs = CassandraQueryTestUtil.createMockDateResultSet(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + DateFormat df = new SimpleDateFormat(QueryCassandra.TIMESTAMP_FORMAT_PATTERN.getDefaultValue()); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + + long numberOfRows = QueryCassandra.convertToJsonStream(Optional.of(testRunner.getProcessContext()), rs, baos, + StandardCharsets.UTF_8, 0, null); + assertEquals(1, numberOfRows); + + Map<String, List<Map<String, String>>> map = new ObjectMapper().readValue(baos.toByteArray(), HashMap.class); + String date = map.get("results").get(0).get("date"); + assertEquals(df.format(CassandraQueryTestUtil.TEST_DATE), date); + } + + @Test + public void testCustomDateFormatInConvertToJSONStream() throws Exception { + MockProcessContext context = (MockProcessContext) testRunner.getProcessContext(); + ResultSet rs = CassandraQueryTestUtil.createMockDateResultSet(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final String customDateFormat = "yyyy-MM-dd HH:mm:ss.SSSZ"; + context.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, customDateFormat); + DateFormat df = new SimpleDateFormat(customDateFormat); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + + long numberOfRows = QueryCassandra.convertToJsonStream(Optional.of(context), rs, baos, StandardCharsets.UTF_8, 0, null); + assertEquals(1, numberOfRows); + + Map<String, List<Map<String, String>>> map = new ObjectMapper().readValue(baos.toByteArray(), HashMap.class); + String date = map.get("results").get(0).get("date"); + assertEquals(df.format(CassandraQueryTestUtil.TEST_DATE), date); + } + private void setUpStandardProcessorConfig() { testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE"); testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042");