This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ebe8f2983 NIFI-11538: This closes #7239. Fix primitive type 
conversion for PutIceberg
3ebe8f2983 is described below

commit 3ebe8f2983f1214a2062da6d698d85d8587c8a50
Author: Matthew Burgess <mattyb...@apache.org>
AuthorDate: Thu May 11 12:10:47 2023 -0400

    NIFI-11538: This closes #7239. Fix primitive type conversion for PutIceberg
---
 .../iceberg/converter/GenericDataConverters.java   |  94 ++++++++++++++-----
 .../iceberg/converter/IcebergRecordConverter.java  |  15 +--
 .../iceberg/TestIcebergRecordConverter.java        | 101 +++++++++++++++++++++
 .../src/test/resources/user.avsc                   |   2 +-
 4 files changed, 183 insertions(+), 29 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index 162d2a38b1..8854bf6e0c 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -19,15 +19,16 @@ package org.apache.nifi.processors.iceberg.converter;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.Validate;
 import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -47,45 +48,91 @@ import static 
org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.cre
  */
 public class GenericDataConverters {
 
-    static class SameTypeConverter extends DataConverter<Object, Object> {
+    static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+        final Type.PrimitiveType targetType;
+        final DataType sourceType;
+
+        public PrimitiveTypeConverter(final Type.PrimitiveType type, final 
DataType dataType) {
+            targetType = type;
+            sourceType = dataType;
+        }
 
         @Override
         public Object convert(Object data) {
-            return data;
+            switch (targetType.typeId()) {
+                case BOOLEAN:
+                    return DataTypeUtils.toBoolean(data, null);
+                case INTEGER:
+                    return DataTypeUtils.toInteger(data, null);
+                case LONG:
+                    return DataTypeUtils.toLong(data, null);
+                case FLOAT:
+                    return DataTypeUtils.toFloat(data, null);
+                case DOUBLE:
+                    return DataTypeUtils.toDouble(data, null);
+                case DATE:
+                    return DataTypeUtils.toLocalDate(data, () -> 
DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(), 
ZoneId.systemDefault()), null);
+                case UUID:
+                    return DataTypeUtils.toUUID(data);
+                case STRING:
+                default:
+                    return DataTypeUtils.toString(data, () -> null);
+            }
         }
     }
 
-    static class TimeConverter extends DataConverter<Time, LocalTime> {
+    static class TimeConverter extends DataConverter<Object, LocalTime> {
+
+        private final String timeFormat;
+
+        public TimeConverter(final String format) {
+            this.timeFormat = format;
+        }
 
         @Override
-        public LocalTime convert(Time data) {
-            return data.toLocalTime();
+        public LocalTime convert(Object data) {
+            return DataTypeUtils.toTime(data, () -> 
DataTypeUtils.getDateFormat(timeFormat), null).toLocalTime();
         }
     }
 
-    static class TimestampConverter extends DataConverter<Timestamp, 
LocalDateTime> {
+    static class TimestampConverter extends DataConverter<Object, 
LocalDateTime> {
+
+        private final DataType dataType;
+
+        public TimestampConverter(final DataType dataType) {
+            this.dataType = dataType;
+        }
 
         @Override
-        public LocalDateTime convert(Timestamp data) {
-            return data.toLocalDateTime();
+        public LocalDateTime convert(Object data) {
+            final Timestamp convertedTimestamp = 
DataTypeUtils.toTimestamp(data, () -> 
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
+            return convertedTimestamp.toLocalDateTime();
         }
     }
 
-    static class TimestampWithTimezoneConverter extends 
DataConverter<Timestamp, OffsetDateTime> {
+    static class TimestampWithTimezoneConverter extends DataConverter<Object, 
OffsetDateTime> {
+
+        private final DataType dataType;
+
+        public TimestampWithTimezoneConverter(final DataType dataType) {
+            this.dataType = dataType;
+        }
 
         @Override
-        public OffsetDateTime convert(Timestamp data) {
-            return OffsetDateTime.ofInstant(data.toInstant(), 
ZoneId.of("UTC"));
+        public OffsetDateTime convert(Object data) {
+            final Timestamp convertedTimestamp = 
DataTypeUtils.toTimestamp(data, () -> 
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
+            return OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), 
ZoneId.of("UTC"));
         }
     }
 
-    static class UUIDtoByteArrayConverter extends DataConverter<UUID, byte[]> {
+    static class UUIDtoByteArrayConverter extends DataConverter<Object, 
byte[]> {
 
         @Override
-        public byte[] convert(UUID data) {
+        public byte[] convert(Object data) {
+            final UUID uuid = DataTypeUtils.toUUID(data);
             ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
-            byteBuffer.putLong(data.getMostSignificantBits());
-            byteBuffer.putLong(data.getLeastSignificantBits());
+            byteBuffer.putLong(uuid.getMostSignificantBits());
+            byteBuffer.putLong(uuid.getLeastSignificantBits());
             return byteBuffer.array();
         }
     }
@@ -113,7 +160,7 @@ public class GenericDataConverters {
         }
     }
 
-    static class BigDecimalConverter extends DataConverter<BigDecimal, 
BigDecimal> {
+    static class BigDecimalConverter extends DataConverter<Object, BigDecimal> 
{
         private final int precision;
         private final int scale;
 
@@ -123,10 +170,15 @@ public class GenericDataConverters {
         }
 
         @Override
-        public BigDecimal convert(BigDecimal data) {
-            Validate.isTrue(data.scale() == scale, "Cannot write value as 
decimal(%s,%s), wrong scale: %s", precision, scale, data);
-            Validate.isTrue(data.precision() <= precision, "Cannot write value 
as decimal(%s,%s), invalid precision: %s", precision, scale, data);
-            return data;
+        public BigDecimal convert(Object data) {
+            if (data instanceof BigDecimal) {
+                BigDecimal bigDecimal = (BigDecimal) data;
+                Validate.isTrue(bigDecimal.scale() == scale, "Cannot write 
value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, 
bigDecimal.scale(), data);
+                Validate.isTrue(bigDecimal.precision() <= precision, "Cannot 
write value as decimal(%s,%s), invalid precision %s for value: %s",
+                        precision, scale, bigDecimal.precision(), data);
+                return bigDecimal;
+            }
+            return DataTypeUtils.toBigDecimal(data, null);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
index 6a74eca4fd..33049123ef 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java
@@ -46,7 +46,6 @@ import java.util.stream.Collectors;
 public class IcebergRecordConverter {
 
     private final DataConverter<Record, GenericRecord> converter;
-
     public GenericRecord convert(Record record) {
         return converter.convert(record);
     }
@@ -85,21 +84,21 @@ public class IcebergRecordConverter {
                     case DOUBLE:
                     case DATE:
                     case STRING:
-                        return new GenericDataConverters.SameTypeConverter();
+                        return new 
GenericDataConverters.PrimitiveTypeConverter(type, dataType);
                     case TIME:
-                        return new GenericDataConverters.TimeConverter();
+                        return new 
GenericDataConverters.TimeConverter(dataType.getFormat());
                     case TIMESTAMP:
                         final Types.TimestampType timestampType = 
(Types.TimestampType) type;
                         if (timestampType.shouldAdjustToUTC()) {
-                            return new 
GenericDataConverters.TimestampWithTimezoneConverter();
+                            return new 
GenericDataConverters.TimestampWithTimezoneConverter(dataType);
                         }
-                        return new GenericDataConverters.TimestampConverter();
+                        return new 
GenericDataConverters.TimestampConverter(dataType);
                     case UUID:
                         final UUIDDataType uuidType = (UUIDDataType) dataType;
                         if (uuidType.getFileFormat() == FileFormat.PARQUET) {
                             return new 
GenericDataConverters.UUIDtoByteArrayConverter();
                         }
-                        return new GenericDataConverters.SameTypeConverter();
+                        return new 
GenericDataConverters.PrimitiveTypeConverter(type, dataType);
                     case FIXED:
                         final Types.FixedType fixedType = (Types.FixedType) 
type;
                         return new 
GenericDataConverters.FixedConverter(fixedType.length());
@@ -167,7 +166,9 @@ public class IcebergRecordConverter {
                 return new RecordTypeWithFieldNameMapper(new 
Schema(schema.findField(fieldId).type().asStructType().fields()), 
(RecordDataType) field.getDataType());
             }
 
-            if 
(field.getDataType().getFieldType().equals(RecordFieldType.UUID)) {
+            // If the source field or target field is of type UUID, create a 
UUIDDataType from it
+            if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)
+                    || schema.findField(fieldId).type().typeId() == 
Type.TypeID.UUID) {
                 return new UUIDDataType(field.getDataType(), fileFormat);
             }
 
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index f69946e2b8..18690ffe78 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -67,6 +67,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.OffsetDateTime;
+import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -143,6 +144,23 @@ public class TestIcebergRecordConverter {
             Types.NestedField.optional(14, "choice", Types.IntegerType.get())
     );
 
+    private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema(
+            Types.NestedField.optional(0, "string", Types.StringType.get()),
+            Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "float", Types.FloatType.get()),
+            Types.NestedField.optional(3, "long", Types.LongType.get()),
+            Types.NestedField.optional(4, "double", Types.DoubleType.get()),
+            Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 
2)),
+            Types.NestedField.optional(7, "fixed", 
Types.FixedType.ofLength(5)),
+            Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
+            Types.NestedField.optional(9, "date", Types.DateType.get()),
+            Types.NestedField.optional(10, "time", Types.TimeType.get()),
+            Types.NestedField.optional(11, "timestamp", 
Types.TimestampType.withZone()),
+            Types.NestedField.optional(12, "timestampTz", 
Types.TimestampType.withoutZone()),
+            Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
+            Types.NestedField.optional(14, "choice", Types.IntegerType.get())
+    );
+
     private static final Schema CASE_INSENSITIVE_SCHEMA = new Schema(
             Types.NestedField.optional(0, "FIELD1", Types.StringType.get()),
             Types.NestedField.optional(1, "Field2", Types.StringType.get()),
@@ -221,6 +239,26 @@ public class TestIcebergRecordConverter {
         return new SimpleRecordSchema(fields);
     }
 
+    private static RecordSchema getPrimitivesAsCompatiblesSchema() {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("string", 
RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("integer", 
RecordFieldType.SHORT.getDataType()));
+        fields.add(new RecordField("float", 
RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("long", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("double", 
RecordFieldType.FLOAT.getDataType()));
+        fields.add(new RecordField("decimal", 
RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("fixed", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        fields.add(new RecordField("binary", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        fields.add(new RecordField("date", 
RecordFieldType.STRING.getDataType("yyyy-MM-dd")));
+        fields.add(new RecordField("time", 
RecordFieldType.STRING.getDataType("hh:mm:ss.SSS")));
+        fields.add(new RecordField("timestamp", 
RecordFieldType.STRING.getDataType("yyyy-MM-dd hh:mm:ss.SSSZ")));
+        fields.add(new RecordField("timestampTz", 
RecordFieldType.STRING.getDataType("yyyy-MM-dd hh:mm:ss.SSSZ")));
+        fields.add(new RecordField("uuid", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("choice", 
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), 
RecordFieldType.INT.getDataType())));
+
+        return new SimpleRecordSchema(fields);
+    }
+
     private static RecordSchema getCaseInsensitiveSchema() {
         List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("field1", 
RecordFieldType.STRING.getDataType()));
@@ -331,6 +369,27 @@ public class TestIcebergRecordConverter {
         return new MapRecord(getPrimitivesSchema(), values);
     }
 
+    private static Record setupCompatiblePrimitivesTestRecord() {
+
+        Map<String, Object> values = new HashMap<>();
+        values.put("string", 123);
+        values.put("integer", 8);
+        values.put("float", 1.23456);
+        values.put("long", 42L);
+        values.put("double", 3.14159);
+        values.put("decimal", 12345678.12);
+        values.put("fixed", "hello".getBytes());
+        values.put("binary", "hello".getBytes());
+        values.put("date", "2017-04-04");
+        values.put("time", "14:20:33.000");
+        values.put("timestamp", "2017-04-04 14:20:33.789-0500");
+        values.put("timestampTz", "2017-04-04 14:20:33.789-0500");
+        values.put("uuid", "0000-00-00-00-000000");
+        values.put("choice", "10");
+
+        return new MapRecord(getPrimitivesAsCompatiblesSchema(), values);
+    }
+
     private static Record setupCaseInsensitiveTestRecord() {
         Map<String, Object> values = new HashMap<>();
         values.put("field1", "Text1");
@@ -414,6 +473,48 @@ public class TestIcebergRecordConverter {
         }
     }
 
+    @DisabledOnOs(WINDOWS)
+    @ParameterizedTest
+    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
+    public void testCompatiblePrimitives(FileFormat format) throws IOException 
{
+        RecordSchema nifiSchema = getPrimitivesAsCompatiblesSchema();
+        Record record = setupCompatiblePrimitivesTestRecord();
+
+        IcebergRecordConverter recordConverter = new 
IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format);
+        GenericRecord genericRecord = recordConverter.convert(record);
+
+        writeTo(format, COMPATIBLE_PRIMITIVES_SCHEMA, genericRecord, tempFile);
+
+        List<GenericRecord> results = readFrom(format, 
COMPATIBLE_PRIMITIVES_SCHEMA, tempFile.toInputFile());
+
+        assertEquals(results.size(), 1);
+        GenericRecord resultRecord = results.get(0);
+
+        LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 
789000000);
+        OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, 
ZoneOffset.ofHours(-5));
+        LocalDateTime expectedLocalDateTimestamp = 
offsetDateTime.atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
+
+        assertEquals("123", resultRecord.get(0, String.class));
+        assertEquals(Integer.valueOf(8), resultRecord.get(1, Integer.class));
+        assertEquals(Float.valueOf(1.23456F), resultRecord.get(2, 
Float.class));
+        assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class));
+        assertEquals(Double.valueOf(3.141590118408203), resultRecord.get(4, 
Double.class));
+        assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, 
BigDecimal.class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(6, byte[].class));
+        assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, 
resultRecord.get(7, ByteBuffer.class).array());
+        assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(8, 
LocalDate.class));
+        assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(9, 
LocalTime.class));
+        assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(10, OffsetDateTime.class));
+        assertEquals(expectedLocalDateTimestamp, resultRecord.get(11, 
LocalDateTime.class));
+        assertEquals(Integer.valueOf(10), resultRecord.get(13, Integer.class));
+
+        if (format.equals(PARQUET)) {
+            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0}, resultRecord.get(12, byte[].class));
+        } else {
+            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(12, UUID.class));
+        }
+    }
+
     @DisabledOnOs(WINDOWS)
     @ParameterizedTest
     @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
index 5cdac5fe8c..c537a9e496 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
@@ -19,7 +19,7 @@
  "type": "record",
  "name": "User",
  "fields": [
-     {"name": "id",  "type": ["int", "null"]},
+     {"name": "id",  "type": ["long", "null"]},
      {"name": "name", "type": ["string", "null"]},
      {"name": "department", "type": ["string", "null"]}
  ]

Reply via email to