This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new b6d044853c NIFI-12885 Added Record Methods for Local and Offset Dates (#8502) b6d044853c is described below commit b6d044853cfcf568c246362bb5a337ea3bf2d951 Author: knguyen1 <yuram...@gmail.com> AuthorDate: Thu Apr 25 00:23:54 2024 -0400 NIFI-12885 Added Record Methods for Local and Offset Dates (#8502) Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../nifi/serialization/record/MapRecord.java | 24 +++- .../apache/nifi/serialization/record/Record.java | 11 +- .../record/ResultSetRecordSetTest.java | 5 +- .../nifi/serialization/record/TestMapRecord.java | 126 +++++++++++++++++---- .../serialization/record/util/TriFunction.java | 32 ++++++ .../nifi/processors/standard/TestQueryRecord.java | 2 +- 6 files changed, 167 insertions(+), 33 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index ee5a5991d0..359d574990 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -32,11 +32,12 @@ import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -279,10 +280,23 @@ public class MapRecord implements Record { } @Override - public Date getAsDate(final String fieldName, final String format) { - final FieldConverter<Object, LocalDate> converter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(LocalDate.class); - final LocalDate localDate = converter.convertField(getValue(fieldName), Optional.ofNullable(format), fieldName); - return localDate == null ? null : java.sql.Date.valueOf(localDate); + public LocalDate getAsLocalDate(final String fieldName, final String format) { + return convertFieldToDateTime(LocalDate.class, fieldName, format); + } + + @Override + public LocalDateTime getAsLocalDateTime(String fieldName, String format) { + return convertFieldToDateTime(LocalDateTime.class, fieldName, format); + } + + @Override + public OffsetDateTime getAsOffsetDateTime(final String fieldName, final String format) { + return convertFieldToDateTime(OffsetDateTime.class, fieldName, format); + } + + private <T> T convertFieldToDateTime(Class<T> clazz, String fieldName, String format) { + final FieldConverter<Object, T> converter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(clazz); + return converter.convertField(getValue(fieldName), Optional.ofNullable(format), fieldName); } @Override diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java index 8aebe8275e..0133b0175c 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java @@ -17,10 +17,13 @@ package org.apache.nifi.serialization.record; -import java.util.Date; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.util.Map; import java.util.Optional; import java.util.Set; + import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; public interface Record { @@ -107,7 +110,11 @@ public interface Record { Boolean getAsBoolean(String fieldName); - Date getAsDate(String fieldName, String format); + LocalDate getAsLocalDate(String fieldName, String format); + + LocalDateTime getAsLocalDateTime(String fieldName, String format); + + OffsetDateTime getAsOffsetDateTime(String fieldName, String format); Object[] getAsArray(String fieldName); diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java index fa594a85a6..b4fe5209dd 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java @@ -275,7 +275,6 @@ public class ResultSetRecordSetTest { final Boolean bitValue = Boolean.FALSE; final Boolean booleanValue = Boolean.TRUE; final Character charValue = 'c'; - final Date dateValue = Date.valueOf(testDate); final Timestamp timestampValue = Timestamp.valueOf(testDateTime); final Integer integerValue = 1234567890; final Double doubleValue = 0.12; @@ -295,7 +294,7 @@ public class ResultSetRecordSetTest { when(resultSet.getObject(COLUMN_NAME_BIT)).thenReturn(bitValue); when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue); when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue); - when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue); + when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(testDate); when(resultSet.getTimestamp(COLUMN_NAME_TIMESTAMP)).thenReturn(timestampValue); when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue); when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue); @@ -319,7 +318,7 @@ public class ResultSetRecordSetTest { assertEquals(booleanValue, record.getAsBoolean(COLUMN_NAME_BOOLEAN)); assertEquals(charValue, record.getValue(COLUMN_NAME_CHAR)); - assertEquals(dateValue, record.getAsDate(COLUMN_NAME_DATE, null)); + assertEquals(testDate, record.getAsLocalDate(COLUMN_NAME_DATE, null)); final Object timestampObject = record.getValue(COLUMN_NAME_TIMESTAMP); assertEquals(timestampValue, timestampObject); diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java index b343ab0647..58a0b53061 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java @@ -21,8 +21,16 @@ import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.TriFunction; import org.junit.jupiter.api.Test; - +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.Arrays; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -30,6 +38,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -38,7 +47,11 @@ import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestMapRecord { +class TestMapRecord { + + private static final String ISO_LOCAL_DATE = "yyyy-MM-dd"; + private static final String ISO_LOCAL_DATE_TIME = "yyyy-MM-dd'T'HH:mm:ss.SSS"; + private static final String ISO_OFFSET_DATE_TIME = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; private static final List<RecordField> STRING_NUMBER_FIELDS = List.of( new RecordField("string", RecordFieldType.STRING.getDataType()), @@ -47,7 +60,7 @@ public class TestMapRecord { @Test - public void testRenameClearsSerializedForm() { + void testRenameClearsSerializedForm() { final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8)); final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS); final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test")); @@ -58,7 +71,7 @@ public class TestMapRecord { } @Test - public void testRemoveClearsSerializedForm() { + void testRemoveClearsSerializedForm() { final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8)); final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS); final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test")); @@ -69,7 +82,7 @@ public class TestMapRecord { } @Test - public void testRenameRemoveInvalidFieldsToNotClearSerializedForm() { + void testRenameRemoveInvalidFieldsToNotClearSerializedForm() { final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8)); final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS); final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test")); @@ -85,7 +98,7 @@ public class TestMapRecord { } @Test - public void testIncorporateInactiveFieldsWithUpdate() { + void testIncorporateInactiveFieldsWithUpdate() { final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8)); final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS); final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test")); @@ -105,7 +118,7 @@ public class TestMapRecord { } @Test - public void testIncorporateInactiveFieldsWithConflict() { + void testIncorporateInactiveFieldsWithConflict() { final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8)); final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS); final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test")); @@ -127,7 +140,7 @@ public class TestMapRecord { } @Test - public void testDefaultValue() { + void testDefaultValue() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType())); fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello")); @@ -141,7 +154,7 @@ public class TestMapRecord { } @Test - public void testDefaultValueInGivenField() { + void testDefaultValueInGivenField() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType())); fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello")); @@ -158,7 +171,7 @@ public class TestMapRecord { } @Test - public void testIllegalDefaultValue() { + void testIllegalDefaultValue() { new RecordField("hello", RecordFieldType.STRING.getDataType(), 84); new RecordField("hello", RecordFieldType.STRING.getDataType(), (Object) null); new RecordField("hello", RecordFieldType.INT.getDataType(), 84); @@ -168,15 +181,11 @@ public class TestMapRecord { } private Set<String> set(final String... values) { - final Set<String> set = new LinkedHashSet<>(); - for (final String value : values) { - set.add(value); - } - return set; + return new LinkedHashSet<>(Arrays.asList(values)); } @Test - public void testAliasOneValue() { + void testAliasOneValue() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); @@ -191,7 +200,7 @@ public class TestMapRecord { } @Test - public void testAliasConflictingValues() { + void testAliasConflictingValues() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); @@ -207,7 +216,7 @@ public class TestMapRecord { } @Test - public void testAliasConflictingAliasValues() { + void testAliasConflictingAliasValues() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); @@ -223,7 +232,7 @@ public class TestMapRecord { } @Test - public void testAliasInGivenField() { + void testAliasInGivenField() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); @@ -246,7 +255,7 @@ public class TestMapRecord { @Test - public void testDefaultValueWithAliasValue() { + void testDefaultValueWithAliasValue() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz"))); @@ -262,7 +271,7 @@ public class TestMapRecord { } @Test - public void testDefaultValueWithAliasesDefined() { + void testDefaultValueWithAliasesDefined() { final List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz"))); @@ -275,7 +284,7 @@ public class TestMapRecord { } @Test - public void testNestedSchema() { + void testNestedSchema() { final String FOO_TEST_VAL = "test!"; final String NESTED_RECORD_VALUE = "Hello, world!"; @@ -323,4 +332,77 @@ public class TestMapRecord { } } + @ParameterizedTest + @MethodSource("provideLocalDates") + void testGettingLocalDate(final String input, final String format, LocalDate expectedDate) { + executeDateTimeTest(input, format, expectedDate, MapRecord::getAsLocalDate); + } + + @ParameterizedTest + @MethodSource("provideLocalDateTimes") + void testGettingLocalDateTime(final String input, final String format, LocalDateTime expectedDateTime) { + executeDateTimeTest(input, format, expectedDateTime, MapRecord::getAsLocalDateTime); + } + + @ParameterizedTest + @MethodSource("provideOffsetDateTimes") + void testGettingOffsetDateTime(final String input, final String format, OffsetDateTime expectedOffsetDateTime) { + executeDateTimeTest(input, format, expectedOffsetDateTime, MapRecord::getAsOffsetDateTime); + } + + private <T> void executeDateTimeTest(final String input, + final String format, + final Object expectedDateTime, + TriFunction<MapRecord, String, String, T> dateTimeFunction) { + // create a `MapRecord` from the input + final List<RecordField> fields = new ArrayList<>(); + final String timestampFieldName = "timestamp"; + fields.add(new RecordField(timestampFieldName, RecordFieldType.TIMESTAMP.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final HashMap<String, Object> item = new HashMap<>(); + item.put(timestampFieldName, input); + final MapRecord testRecord = new MapRecord(schema, item); + + // apply the datetime function to the record and compare + final T actualDateTime = dateTimeFunction.apply(testRecord, timestampFieldName, format); + assertEquals(expectedDateTime, actualDateTime); + } + + private static Stream<Arguments> provideLocalDates() { + return Stream.of( + Arguments.of("2022-01-01", ISO_LOCAL_DATE, LocalDate.parse("2022-01-01")), + Arguments.of("2022-01-01T12:34:56.789", ISO_LOCAL_DATE_TIME, LocalDate.parse("2022-01-01")), + Arguments.of("2017-06-23T01:02:03.456", ISO_LOCAL_DATE_TIME, LocalDate.parse("2017-06-23")), + Arguments.of("2020-02-29T23:59:59.999", ISO_LOCAL_DATE_TIME, LocalDate.parse("2020-02-29")), // leap year + Arguments.of("2024-03-10T02:00:00.000", ISO_LOCAL_DATE_TIME, LocalDate.parse("2024-03-10")), // DST transition + // test minimum and maximum values + Arguments.of("0001-01-01T00:00:00.000", ISO_LOCAL_DATE_TIME, LocalDate.parse("0001-01-01")), + Arguments.of("9999-12-31T23:59:59.999", ISO_LOCAL_DATE_TIME, LocalDate.parse("9999-12-31")) + ); + } + + private static Stream<Arguments> provideLocalDateTimes() { + return Stream.of( + Arguments.of("2022-01-01T12:34:56.789", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("2022-01-01T12:34:56.789")), + Arguments.of("2017-06-23T01:02:03.456", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("2017-06-23T01:02:03.456")), + Arguments.of("2020-02-29T23:59:59.999", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("2020-02-29T23:59:59.999")), // leap year + Arguments.of("2024-03-10T02:00:00.000", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("2024-03-10T02:00:00.000")), // DST transition + // test minimum and maximum values + Arguments.of("0001-01-01T00:00:00.000", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("0001-01-01T00:00:00.000")), + Arguments.of("9999-12-31T23:59:59.999", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("9999-12-31T23:59:59.999")) + ); + } + + private static Stream<Arguments> provideOffsetDateTimes() { + return Stream.of( + Arguments.of("2022-01-01T12:34:56.789+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2022-01-01T12:34:56.789+00:00")), + Arguments.of("2017-06-23T01:02:03.456+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2017-06-23T01:02:03.456+00:00")), + Arguments.of("2020-02-29T23:59:59.999+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2020-02-29T23:59:59.999+00:00")), // leap year + Arguments.of("2024-03-10T02:00:00.000+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2024-03-10T02:00:00.000+00:00")), // DST transition + // test minimum and maximum values + Arguments.of("0001-01-01T00:00:00.000+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("0001-01-01T00:00:00.000+00:00")), + Arguments.of("9999-12-31T23:59:59.999+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("9999-12-31T23:59:59.999+00:00")) + ); + } } diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/util/TriFunction.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/util/TriFunction.java new file mode 100644 index 0000000000..feb8c227d9 --- /dev/null +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/util/TriFunction.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.util; + +import java.util.Objects; +import java.util.function.Function; + +@FunctionalInterface +public interface TriFunction<S, T, U, R> { + + R apply(S s, T t, U u); + + default <V> TriFunction<S, T, U, V> andThen(Function<? super R, ? extends V> after) { + Objects.requireNonNull(after); + return (S s, T t, U u) -> after.apply(apply(s, t, u)); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 86c1dca4df..308bd2858f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -135,7 +135,7 @@ public class TestQueryRecord { assertArrayEquals(new String[] { "red", "green"}, (Object[]) output.getValue("colors")); assertArrayEquals(new String[] { "John Doe", "Jane Doe"}, (Object[]) output.getValue("names")); - assertEquals(java.sql.Date.valueOf(ISO_DATE), output.getAsDate("joinTime", ISO_DATE_FORMAT)); + assertEquals(java.time.LocalDate.parse(ISO_DATE), output.getAsLocalDate("joinTime", ISO_DATE_FORMAT)); assertEquals(Double.valueOf(180.8D), output.getAsDouble("weight")); }