This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e7da2ab5b8 NIFI-11177: Add defensive code for null values for Iceberg
e7da2ab5b8 is described below

commit e7da2ab5b81ee96493ba2c20203fb247411f45eb
Author: Matt Burgess <mattyb...@apache.org>
AuthorDate: Thu Sep 21 23:56:58 2023 -0400

    NIFI-11177: Add defensive code for null values for Iceberg
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #7777.
---
 .../serialization/record/util/DataTypeUtils.java   |  2 +-
 .../iceberg/converter/GenericDataConverters.java   | 29 +++++++++++++++++++---
 .../iceberg/TestIcebergRecordConverter.java        | 28 +++++++++++++++++++++
 3 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 8752d85b19..7464b28ee9 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -238,7 +238,7 @@ public class DataTypeUtils {
 
     public static UUID toUUID(Object value) {
         if (value == null) {
-            throw new IllegalTypeConversionException("Null values cannot be 
converted to a UUID");
+            return null;
         }
 
         if (value instanceof UUID) {
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
index 8854bf6e0c..c8ee7bd171 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java
@@ -29,6 +29,7 @@ import 
org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -91,7 +92,8 @@ public class GenericDataConverters {
 
         @Override
         public LocalTime convert(Object data) {
-            return DataTypeUtils.toTime(data, () -> 
DataTypeUtils.getDateFormat(timeFormat), null).toLocalTime();
+            Time time = DataTypeUtils.toTime(data, () -> 
DataTypeUtils.getDateFormat(timeFormat), null);
+            return time == null ? null : time.toLocalTime();
         }
     }
 
@@ -106,7 +108,7 @@ public class GenericDataConverters {
         @Override
         public LocalDateTime convert(Object data) {
             final Timestamp convertedTimestamp = 
DataTypeUtils.toTimestamp(data, () -> 
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
-            return convertedTimestamp.toLocalDateTime();
+            return convertedTimestamp == null ? null : 
convertedTimestamp.toLocalDateTime();
         }
     }
 
@@ -121,7 +123,7 @@ public class GenericDataConverters {
         @Override
         public OffsetDateTime convert(Object data) {
             final Timestamp convertedTimestamp = 
DataTypeUtils.toTimestamp(data, () -> 
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
-            return OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), 
ZoneId.of("UTC"));
+            return convertedTimestamp == null ? null : 
OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), ZoneId.of("UTC"));
         }
     }
 
@@ -129,6 +131,9 @@ public class GenericDataConverters {
 
         @Override
         public byte[] convert(Object data) {
+            if (data == null) {
+                return null;
+            }
             final UUID uuid = DataTypeUtils.toUUID(data);
             ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
             byteBuffer.putLong(uuid.getMostSignificantBits());
@@ -147,6 +152,9 @@ public class GenericDataConverters {
 
         @Override
         public byte[] convert(Byte[] data) {
+            if (data == null) {
+                return null;
+            }
             Validate.isTrue(data.length == length, String.format("Cannot write 
byte array of length %s as fixed[%s]", data.length, length));
             return ArrayUtils.toPrimitive(data);
         }
@@ -156,6 +164,9 @@ public class GenericDataConverters {
 
         @Override
         public ByteBuffer convert(Byte[] data) {
+            if (data == null) {
+                return null;
+            }
             return ByteBuffer.wrap(ArrayUtils.toPrimitive(data));
         }
     }
@@ -171,6 +182,9 @@ public class GenericDataConverters {
 
         @Override
         public BigDecimal convert(Object data) {
+            if (data == null) {
+                return null;
+            }
             if (data instanceof BigDecimal) {
                 BigDecimal bigDecimal = (BigDecimal) data;
                 Validate.isTrue(bigDecimal.scale() == scale, "Cannot write 
value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, 
bigDecimal.scale(), data);
@@ -194,6 +208,9 @@ public class GenericDataConverters {
         @Override
         @SuppressWarnings("unchecked")
         public List<T> convert(S[] data) {
+            if (data == null) {
+                return null;
+            }
             final int numElements = data.length;
             final List<T> result = new ArrayList<>(numElements);
             for (int i = 0; i < numElements; i += 1) {
@@ -219,6 +236,9 @@ public class GenericDataConverters {
         @Override
         @SuppressWarnings("unchecked")
         public Map<TK, TV> convert(Map<SK, SV> data) {
+            if (data == null) {
+                return null;
+            }
             final int mapSize = data.size();
             final Object[] keyArray = data.keySet().toArray();
             final Object[] valueArray = data.values().toArray();
@@ -253,6 +273,9 @@ public class GenericDataConverters {
 
         @Override
         public GenericRecord convert(Record data) {
+            if (data == null) {
+                return null;
+            }
             final GenericRecord record = GenericRecord.create(schema);
 
             for (DataConverter<?, ?> converter : converters) {
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index 18690ffe78..baf220fea2 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -83,6 +83,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.condition.OS.WINDOWS;
@@ -471,6 +472,33 @@ public class TestIcebergRecordConverter {
         } else {
             assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
         }
+
+        // Test null values
+        for (String fieldName : record.getRawFieldNames()) {
+            record.setValue(fieldName, null);
+        }
+
+        genericRecord = recordConverter.convert(record);
+
+        writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
+
+        results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
+
+        assertEquals(results.size(), 1);
+        resultRecord = results.get(0);
+        assertNull(resultRecord.get(0, String.class));
+        assertNull(resultRecord.get(1, Integer.class));
+        assertNull(resultRecord.get(2, Float.class));
+        assertNull(resultRecord.get(3, Long.class));
+        assertNull(resultRecord.get(4, Double.class));
+        assertNull(resultRecord.get(5, BigDecimal.class));
+        assertNull(resultRecord.get(6, Boolean.class));
+        assertNull(resultRecord.get(7));
+        assertNull(resultRecord.get(8));
+        assertNull(resultRecord.get(9, LocalDate.class));
+        assertNull(resultRecord.get(10, LocalTime.class));
+        assertNull(resultRecord.get(11, OffsetDateTime.class));
+        assertNull(resultRecord.get(14, Integer.class));
     }
 
     @DisabledOnOs(WINDOWS)

Reply via email to