This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.15 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 4aefc67ccefe4b018483c89ac018798c35871c2c Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Thu Dec 9 12:22:04 2021 -0600 NIFI-9457 Support microseconds for String Timestamps in PutKudu - Implemented override for Timestamp Record Field Type format handling to add support for optional microseconds - Added FieldConverter and ObjectTimestampFieldConverter implementation for generalized Timestamp parsing using DateTimeFormatter - Updated PutKudu unit tests for standard Timestamp and Timestamp with microseconds Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #5589. --- .../serialization/record/field/FieldConverter.java | 37 +++++++ .../field/ObjectTimestampFieldConverter.java | 86 ++++++++++++++++ .../field/ObjectTimestampFieldConverterTest.java | 114 +++++++++++++++++++++ .../processors/kudu/AbstractKuduProcessor.java | 31 +++++- .../apache/nifi/processors/kudu/TestPutKudu.java | 39 +++++++ 5 files changed, 304 insertions(+), 3 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java new file mode 100644 index 0000000..7520be3 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/FieldConverter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.serialization.record.field; + +import java.util.Optional; + +/** + * Generalized Field Converter interface for handling type conversion with optional format parsing + * + * @param <I> Input Field Type + * @param <O> Output Field Type + */ +public interface FieldConverter<I, O> { + /** + * Convert Field using Output Field Type with optional format parsing + * + * @param field Input field to be converted + * @param pattern Format pattern optional for parsing + * @param name Input field name for tracking + * @return Converted Field can be null when input field is null or empty + */ + O convertField(I field, Optional<String> pattern, String name); +} diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java new file mode 100644 index 0000000..a624845 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.serialization.record.field; + +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.Date; +import java.util.Optional; + +/** + * Convert Object to java.sql.Timestamp using instanceof evaluation and optional format pattern for DateTimeFormatter + */ +public class ObjectTimestampFieldConverter implements FieldConverter<Object, Timestamp> { + /** + * Convert Object field to java.sql.Timestamp using optional format supported in DateTimeFormatter + * + * @param field Field can be null or a supported input type + * @param pattern Format pattern optional for parsing + * @param name Field name for tracking + * @return Timestamp or null when input field is null or empty string + * @throws IllegalTypeConversionException Thrown on parsing failures or unsupported types of input fields + */ + @Override + public Timestamp convertField(final Object field, final Optional<String> pattern, final String name) { + if (field == null) { + return null; + } + if (field instanceof Timestamp) { + return (Timestamp) field; + } + if (field instanceof Date) { + final Date date = (Date) field; + return new Timestamp(date.getTime()); + } + if (field instanceof Number) { + final Number number = (Number) field; + return new Timestamp(number.longValue()); + } + if (field instanceof String) { + final String string = field.toString().trim(); + if (string.isEmpty()) { + return null; + } + + if (pattern.isPresent()) { + final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern.get()); + try { + final LocalDateTime localDateTime = LocalDateTime.parse(string, formatter); + return Timestamp.valueOf(localDateTime); + } catch (final DateTimeParseException e) { + final String message = String.format("Convert Field Name [%s] Value [%s] to Timestamp LocalDateTime parsing failed: %s", name, field, e.getMessage()); + throw new IllegalTypeConversionException(message); + } + } else { + try { + final long number = Long.parseLong(string); + return new Timestamp(number); + } catch (final NumberFormatException e) { + final String message = String.format("Convert Field Name [%s] Value [%s] to Timestamp Long parsing failed: %s", name, field, e.getMessage()); + throw new IllegalTypeConversionException(message); + } + } + } + + final String message = String.format("Convert Field Name [%s] Value [%s] Class [%s] to Timestamp not supported", name, field, field.getClass()); + throw new IllegalTypeConversionException(message); + } +} diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java new file mode 100644 index 0000000..9424d0f --- /dev/null +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/field/ObjectTimestampFieldConverterTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.serialization.record.field; + +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; +import org.junit.jupiter.api.Test; + +import java.sql.Timestamp; +import java.util.Date; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ObjectTimestampFieldConverterTest { + private static final ObjectTimestampFieldConverter CONVERTER = new ObjectTimestampFieldConverter(); + + private static final Optional<String> DEFAULT_PATTERN = Optional.of(RecordFieldType.TIMESTAMP.getDefaultFormat()); + + private static final String FIELD_NAME = Timestamp.class.getSimpleName(); + + private static final String EMPTY = ""; + + private static final String DATE_TIME_DEFAULT = "2000-01-01 12:00:00"; + + private static final Optional<String> DATE_TIME_NANOSECONDS_PATTERN = Optional.of("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); + + private static final String DATE_TIME_NANOSECONDS = "2000-01-01 12:00:00.123456789"; + + @Test + public void testConvertFieldNull() { + final Timestamp timestamp = CONVERTER.convertField(null, DEFAULT_PATTERN, FIELD_NAME); + assertNull(timestamp); + } + + @Test + public void testConvertFieldTimestamp() { + final Timestamp field = new Timestamp(System.currentTimeMillis()); + final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME); + assertEquals(field, timestamp); + } + + @Test + public void testConvertFieldDate() { + final Date field = new Date(); + final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME); + assertEquals(field.getTime(), timestamp.getTime()); + } + + @Test + public void testConvertFieldLong() { + final long field = System.currentTimeMillis(); + final Timestamp timestamp = CONVERTER.convertField(field, DEFAULT_PATTERN, FIELD_NAME); + assertEquals(field, timestamp.getTime()); + } + + @Test + public void testConvertFieldStringEmpty() { + final Timestamp timestamp = CONVERTER.convertField(EMPTY, DEFAULT_PATTERN, FIELD_NAME); + assertNull(timestamp); + } + + @Test + public void testConvertFieldStringFormatNull() { + final long currentTime = System.currentTimeMillis(); + final String field = Long.toString(currentTime); + final Timestamp timestamp = CONVERTER.convertField(field, Optional.empty(), FIELD_NAME); + assertEquals(currentTime, timestamp.getTime()); + } + + @Test + public void testConvertFieldStringFormatNullNumberFormatException() { + final String field = String.class.getSimpleName(); + final IllegalTypeConversionException exception = assertThrows(IllegalTypeConversionException.class, () -> CONVERTER.convertField(field, Optional.empty(), FIELD_NAME)); + assertTrue(exception.getMessage().contains(field)); + } + + @Test + public void testConvertFieldStringFormatDefault() { + final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_DEFAULT, DEFAULT_PATTERN, FIELD_NAME); + final Timestamp expected = Timestamp.valueOf(DATE_TIME_DEFAULT); + assertEquals(expected, timestamp); + } + + @Test + public void testConvertFieldStringFormatCustomNanoseconds() { + final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_NANOSECONDS, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME); + final Timestamp expected = Timestamp.valueOf(DATE_TIME_NANOSECONDS); + assertEquals(expected, timestamp); + } + + @Test + public void testConvertFieldStringFormatCustomFormatterException() { + final IllegalTypeConversionException exception = assertThrows(IllegalTypeConversionException.class, () -> CONVERTER.convertField(DATE_TIME_DEFAULT, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME)); + assertTrue(exception.getMessage().contains(DATE_TIME_DEFAULT)); + } +} diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index 3d02055..52546fe 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -50,6 +50,8 @@ import org.apache.nifi.security.krb.KerberosUser; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.field.FieldConverter; +import org.apache.nifi.serialization.record.field.ObjectTimestampFieldConverter; import org.apache.nifi.serialization.record.type.DecimalDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.StringUtils; @@ -161,6 +163,10 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + private static final FieldConverter<Object, Timestamp> TIMESTAMP_FIELD_CONVERTER = new ObjectTimestampFieldConverter(); + /** Timestamp Pattern overrides default RecordFieldType.TIMESTAMP pattern of yyyy-MM-dd HH:mm:ss with optional microseconds */ + private static final String MICROSECOND_TIMESTAMP_PATTERN = "yyyy-MM-dd HH:mm:ss[.SSSSSS]"; + private volatile KuduClient kuduClient; private final ReadWriteLock kuduClientReadWriteLock = new ReentrantReadWriteLock(); private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock(); @@ -417,9 +423,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName)); break; case UNIXTIME_MICROS: - DataType fieldType = record.getSchema().getDataType(recordFieldName).get(); - Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName), - () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName); + final Optional<DataType> optionalDataType = record.getSchema().getDataType(recordFieldName); + final Optional<String> optionalPattern = getTimestampPattern(optionalDataType); + final Timestamp timestamp = TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName); row.addTimestamp(columnIndex, timestamp); break; case STRING: @@ -452,6 +458,25 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { } /** + * Get Timestamp Pattern and override Timestamp Record Field pattern with optional microsecond pattern + * + * @param optionalDataType Optional Data Type + * @return Optional Timestamp Pattern + */ + private Optional<String> getTimestampPattern(final Optional<DataType> optionalDataType) { + String pattern = null; + if (optionalDataType.isPresent()) { + final DataType dataType = optionalDataType.get(); + if (RecordFieldType.TIMESTAMP == dataType.getFieldType()) { + pattern = MICROSECOND_TIMESTAMP_PATTERN; + } else { + pattern = dataType.getFormat(); + } + } + return Optional.ofNullable(pattern); + } + + /** * Get java.sql.Date from Record Field Value with optional parsing when input value is a String * * @param value Record Field Value diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index 126ca6e..8006907 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -96,6 +96,10 @@ public class TestPutKudu { private static final String ISO_8601_YEAR_MONTH_DAY = "2000-01-01"; private static final String ISO_8601_YEAR_MONTH_DAY_PATTERN = "yyyy-MM-dd"; + private static final String TIMESTAMP_FIELD = "updated"; + private static final String TIMESTAMP_STANDARD = "2000-01-01 12:00:00"; + private static final String TIMESTAMP_MICROSECONDS = "2000-01-01 12:00:00.123456"; + private TestRunner testRunner; private MockPutKudu processor; @@ -525,6 +529,41 @@ public class TestPutKudu { assertPartialRowDateFieldEquals(ISO_8601_YEAR_MONTH_DAY); } + @Test + public void testBuildPartialRowWithTimestampStandardString() { + assertPartialRowTimestampFieldEquals(TIMESTAMP_STANDARD); + } + + @Test + public void testBuildPartialRowWithTimestampMicrosecondsString() { + assertPartialRowTimestampFieldEquals(TIMESTAMP_MICROSECONDS); + } + + private void assertPartialRowTimestampFieldEquals(final Object timestampFieldValue) { + final PartialRow row = buildPartialRowTimestampField(timestampFieldValue); + final Timestamp timestamp = row.getTimestamp(TIMESTAMP_FIELD); + final Timestamp expected = Timestamp.valueOf(timestampFieldValue.toString()); + assertEquals("Partial Row Timestamp Field not matched", expected, timestamp); + } + + private PartialRow buildPartialRowTimestampField(final Object timestampFieldValue) { + final Schema kuduSchema = new Schema(Collections.singletonList( + new ColumnSchema.ColumnSchemaBuilder(TIMESTAMP_FIELD, Type.UNIXTIME_MICROS).nullable(true).build() + )); + + final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList( + new RecordField(TIMESTAMP_FIELD, RecordFieldType.TIMESTAMP.getDataType()) + )); + + final Map<String, Object> values = new HashMap<>(); + values.put(TIMESTAMP_FIELD, timestampFieldValue); + final MapRecord record = new MapRecord(schema, values); + + final PartialRow row = kuduSchema.newPartialRow(); + processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true, true); + return row; + } + private void assertPartialRowDateFieldEquals(final Object dateFieldValue) { final PartialRow row = buildPartialRowDateField(dateFieldValue, Type.DATE); final java.sql.Date rowDate = row.getDate(DATE_FIELD);