This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 3ebe8f2983 NIFI-11538: This closes #7239. Fix primitive type conversion for PutIceberg 3ebe8f2983 is described below commit 3ebe8f2983f1214a2062da6d698d85d8587c8a50 Author: Matthew Burgess <mattyb...@apache.org> AuthorDate: Thu May 11 12:10:47 2023 -0400 NIFI-11538: This closes #7239. Fix primitive type conversion for PutIceberg --- .../iceberg/converter/GenericDataConverters.java | 94 ++++++++++++++----- .../iceberg/converter/IcebergRecordConverter.java | 15 +-- .../iceberg/TestIcebergRecordConverter.java | 101 +++++++++++++++++++++ .../src/test/resources/user.avsc | 2 +- 4 files changed, 183 insertions(+), 29 deletions(-) diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java index 162d2a38b1..8854bf6e0c 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java @@ -19,15 +19,16 @@ package org.apache.nifi.processors.iceberg.converter; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.Validate; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.LocalTime; @@ -47,45 +48,91 @@ import static org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.cre */ public class GenericDataConverters { - static class SameTypeConverter extends DataConverter<Object, Object> { + static class PrimitiveTypeConverter extends DataConverter<Object, Object> { + final Type.PrimitiveType targetType; + final DataType sourceType; + + public PrimitiveTypeConverter(final Type.PrimitiveType type, final DataType dataType) { + targetType = type; + sourceType = dataType; + } @Override public Object convert(Object data) { - return data; + switch (targetType.typeId()) { + case BOOLEAN: + return DataTypeUtils.toBoolean(data, null); + case INTEGER: + return DataTypeUtils.toInteger(data, null); + case LONG: + return DataTypeUtils.toLong(data, null); + case FLOAT: + return DataTypeUtils.toFloat(data, null); + case DOUBLE: + return DataTypeUtils.toDouble(data, null); + case DATE: + return DataTypeUtils.toLocalDate(data, () -> DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(), ZoneId.systemDefault()), null); + case UUID: + return DataTypeUtils.toUUID(data); + case STRING: + default: + return DataTypeUtils.toString(data, () -> null); + } } } - static class TimeConverter extends DataConverter<Time, LocalTime> { + static class TimeConverter extends DataConverter<Object, LocalTime> { + + private final String timeFormat; + + public TimeConverter(final String format) { + this.timeFormat = format; + } @Override - public LocalTime convert(Time data) { - return data.toLocalTime(); + public LocalTime convert(Object data) { + return DataTypeUtils.toTime(data, () -> DataTypeUtils.getDateFormat(timeFormat), null).toLocalTime(); } } - static class TimestampConverter extends DataConverter<Timestamp, LocalDateTime> { + static class TimestampConverter extends DataConverter<Object, LocalDateTime> { + + private final DataType dataType; + + public TimestampConverter(final DataType dataType) { + this.dataType = dataType; + } @Override - public LocalDateTime convert(Timestamp data) { - return data.toLocalDateTime(); + public LocalDateTime convert(Object data) { + final Timestamp convertedTimestamp = DataTypeUtils.toTimestamp(data, () -> DataTypeUtils.getDateFormat(dataType.getFormat()), null); + return convertedTimestamp.toLocalDateTime(); } } - static class TimestampWithTimezoneConverter extends DataConverter<Timestamp, OffsetDateTime> { + static class TimestampWithTimezoneConverter extends DataConverter<Object, OffsetDateTime> { + + private final DataType dataType; + + public TimestampWithTimezoneConverter(final DataType dataType) { + this.dataType = dataType; + } @Override - public OffsetDateTime convert(Timestamp data) { - return OffsetDateTime.ofInstant(data.toInstant(), ZoneId.of("UTC")); + public OffsetDateTime convert(Object data) { + final Timestamp convertedTimestamp = DataTypeUtils.toTimestamp(data, () -> DataTypeUtils.getDateFormat(dataType.getFormat()), null); + return OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), ZoneId.of("UTC")); } } - static class UUIDtoByteArrayConverter extends DataConverter<UUID, byte[]> { + static class UUIDtoByteArrayConverter extends DataConverter<Object, byte[]> { @Override - public byte[] convert(UUID data) { + public byte[] convert(Object data) { + final UUID uuid = DataTypeUtils.toUUID(data); ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); - byteBuffer.putLong(data.getMostSignificantBits()); - byteBuffer.putLong(data.getLeastSignificantBits()); + byteBuffer.putLong(uuid.getMostSignificantBits()); + byteBuffer.putLong(uuid.getLeastSignificantBits()); return byteBuffer.array(); } } @@ -113,7 +160,7 @@ public class GenericDataConverters { } } - static class BigDecimalConverter extends DataConverter<BigDecimal, BigDecimal> { + static class BigDecimalConverter extends DataConverter<Object, BigDecimal> { private final int precision; private final int scale; @@ -123,10 +170,15 @@ public class GenericDataConverters { } @Override - public BigDecimal convert(BigDecimal data) { - Validate.isTrue(data.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, data); - Validate.isTrue(data.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision: %s", precision, scale, data); - return data; + public BigDecimal convert(Object data) { + if (data instanceof BigDecimal) { + BigDecimal bigDecimal = (BigDecimal) data; + Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, bigDecimal.scale(), data); + Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision %s for value: %s", + precision, scale, bigDecimal.precision(), data); + return bigDecimal; + } + return DataTypeUtils.toBigDecimal(data, null); } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java index 6a74eca4fd..33049123ef 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java @@ -46,7 +46,6 @@ import java.util.stream.Collectors; public class IcebergRecordConverter { private final DataConverter<Record, GenericRecord> converter; - public GenericRecord convert(Record record) { return converter.convert(record); } @@ -85,21 +84,21 @@ public class IcebergRecordConverter { case DOUBLE: case DATE: case STRING: - return new GenericDataConverters.SameTypeConverter(); + return new GenericDataConverters.PrimitiveTypeConverter(type, dataType); case TIME: - return new GenericDataConverters.TimeConverter(); + return new GenericDataConverters.TimeConverter(dataType.getFormat()); case TIMESTAMP: final Types.TimestampType timestampType = (Types.TimestampType) type; if (timestampType.shouldAdjustToUTC()) { - return new GenericDataConverters.TimestampWithTimezoneConverter(); + return new GenericDataConverters.TimestampWithTimezoneConverter(dataType); } - return new GenericDataConverters.TimestampConverter(); + return new GenericDataConverters.TimestampConverter(dataType); case UUID: final UUIDDataType uuidType = (UUIDDataType) dataType; if (uuidType.getFileFormat() == FileFormat.PARQUET) { return new GenericDataConverters.UUIDtoByteArrayConverter(); } - return new GenericDataConverters.SameTypeConverter(); + return new GenericDataConverters.PrimitiveTypeConverter(type, dataType); case FIXED: final Types.FixedType fixedType = (Types.FixedType) type; return new GenericDataConverters.FixedConverter(fixedType.length()); @@ -167,7 +166,9 @@ public class IcebergRecordConverter { return new RecordTypeWithFieldNameMapper(new Schema(schema.findField(fieldId).type().asStructType().fields()), (RecordDataType) field.getDataType()); } - if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)) { + // If the source field or target field is of type UUID, create a UUIDDataType from it + if (field.getDataType().getFieldType().equals(RecordFieldType.UUID) + || schema.findField(fieldId).type().typeId() == Type.TypeID.UUID) { return new UUIDDataType(field.getDataType(), fileFormat); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java index f69946e2b8..18690ffe78 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java @@ -67,6 +67,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; @@ -143,6 +144,23 @@ public class TestIcebergRecordConverter { Types.NestedField.optional(14, "choice", Types.IntegerType.get()) ); + private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema( + Types.NestedField.optional(0, "string", Types.StringType.get()), + Types.NestedField.optional(1, "integer", Types.IntegerType.get()), + Types.NestedField.optional(2, "float", Types.FloatType.get()), + Types.NestedField.optional(3, "long", Types.LongType.get()), + Types.NestedField.optional(4, "double", Types.DoubleType.get()), + Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 2)), + Types.NestedField.optional(7, "fixed", Types.FixedType.ofLength(5)), + Types.NestedField.optional(8, "binary", Types.BinaryType.get()), + Types.NestedField.optional(9, "date", Types.DateType.get()), + Types.NestedField.optional(10, "time", Types.TimeType.get()), + Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()), + Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()), + Types.NestedField.optional(13, "uuid", Types.UUIDType.get()), + Types.NestedField.optional(14, "choice", Types.IntegerType.get()) + ); + private static final Schema CASE_INSENSITIVE_SCHEMA = new Schema( Types.NestedField.optional(0, "FIELD1", Types.StringType.get()), Types.NestedField.optional(1, "Field2", Types.StringType.get()), @@ -221,6 +239,26 @@ public class TestIcebergRecordConverter { return new SimpleRecordSchema(fields); } + private static RecordSchema getPrimitivesAsCompatiblesSchema() { + List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("string", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("integer", RecordFieldType.SHORT.getDataType())); + fields.add(new RecordField("float", RecordFieldType.DOUBLE.getDataType())); + fields.add(new RecordField("long", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("double", RecordFieldType.FLOAT.getDataType())); + fields.add(new RecordField("decimal", RecordFieldType.DOUBLE.getDataType())); + fields.add(new RecordField("fixed", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))); + fields.add(new RecordField("binary", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))); + fields.add(new RecordField("date", RecordFieldType.STRING.getDataType("yyyy-MM-dd"))); + fields.add(new RecordField("time", RecordFieldType.STRING.getDataType("hh:mm:ss.SSS"))); + fields.add(new RecordField("timestamp", RecordFieldType.STRING.getDataType("yyyy-MM-dd hh:mm:ss.SSSZ"))); + fields.add(new RecordField("timestampTz", RecordFieldType.STRING.getDataType("yyyy-MM-dd hh:mm:ss.SSSZ"))); + fields.add(new RecordField("uuid", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType()))); + + return new SimpleRecordSchema(fields); + } + private static RecordSchema getCaseInsensitiveSchema() { List<RecordField> fields = new ArrayList<>(); fields.add(new RecordField("field1", RecordFieldType.STRING.getDataType())); @@ -331,6 +369,27 @@ public class TestIcebergRecordConverter { return new MapRecord(getPrimitivesSchema(), values); } + private static Record setupCompatiblePrimitivesTestRecord() { + + Map<String, Object> values = new HashMap<>(); + values.put("string", 123); + values.put("integer", 8); + values.put("float", 1.23456); + values.put("long", 42L); + values.put("double", 3.14159); + values.put("decimal", 12345678.12); + values.put("fixed", "hello".getBytes()); + values.put("binary", "hello".getBytes()); + values.put("date", "2017-04-04"); + values.put("time", "14:20:33.000"); + values.put("timestamp", "2017-04-04 14:20:33.789-0500"); + values.put("timestampTz", "2017-04-04 14:20:33.789-0500"); + values.put("uuid", "0000-00-00-00-000000"); + values.put("choice", "10"); + + return new MapRecord(getPrimitivesAsCompatiblesSchema(), values); + } + private static Record setupCaseInsensitiveTestRecord() { Map<String, Object> values = new HashMap<>(); values.put("field1", "Text1"); @@ -414,6 +473,48 @@ public class TestIcebergRecordConverter { } } + @DisabledOnOs(WINDOWS) + @ParameterizedTest + @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) + public void testCompatiblePrimitives(FileFormat format) throws IOException { + RecordSchema nifiSchema = getPrimitivesAsCompatiblesSchema(); + Record record = setupCompatiblePrimitivesTestRecord(); + + IcebergRecordConverter recordConverter = new IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format); + GenericRecord genericRecord = recordConverter.convert(record); + + writeTo(format, COMPATIBLE_PRIMITIVES_SCHEMA, genericRecord, tempFile); + + List<GenericRecord> results = readFrom(format, COMPATIBLE_PRIMITIVES_SCHEMA, tempFile.toInputFile()); + + assertEquals(results.size(), 1); + GenericRecord resultRecord = results.get(0); + + LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); + OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5)); + LocalDateTime expectedLocalDateTimestamp = offsetDateTime.atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime(); + + assertEquals("123", resultRecord.get(0, String.class)); + assertEquals(Integer.valueOf(8), resultRecord.get(1, Integer.class)); + assertEquals(Float.valueOf(1.23456F), resultRecord.get(2, Float.class)); + assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class)); + assertEquals(Double.valueOf(3.141590118408203), resultRecord.get(4, Double.class)); + assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class)); + assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(6, byte[].class)); + assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, ByteBuffer.class).array()); + assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(8, LocalDate.class)); + assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(9, LocalTime.class)); + assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(10, OffsetDateTime.class)); + assertEquals(expectedLocalDateTimestamp, resultRecord.get(11, LocalDateTime.class)); + assertEquals(Integer.valueOf(10), resultRecord.get(13, Integer.class)); + + if (format.equals(PARQUET)) { + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(12, byte[].class)); + } else { + assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(12, UUID.class)); + } + } + @DisabledOnOs(WINDOWS) @ParameterizedTest @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"}) diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc index 5cdac5fe8c..c537a9e496 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc @@ -19,7 +19,7 @@ "type": "record", "name": "User", "fields": [ - {"name": "id", "type": ["int", "null"]}, + {"name": "id", "type": ["long", "null"]}, {"name": "name", "type": ["string", "null"]}, {"name": "department", "type": ["string", "null"]} ]