This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new e98a2ef5fd NIFI-11177: Add defensive code for null values for Iceberg e98a2ef5fd is described below commit e98a2ef5fd7b0c4229545130f6c5aebb670e8d27 Author: Matt Burgess <mattyb...@apache.org> AuthorDate: Thu Sep 21 23:56:58 2023 -0400 NIFI-11177: Add defensive code for null values for Iceberg Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #7777. --- .../serialization/record/util/DataTypeUtils.java | 2 +- .../iceberg/converter/GenericDataConverters.java | 29 +++++++++++++++++++--- .../iceberg/TestIcebergRecordConverter.java | 28 +++++++++++++++++++++ 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 8752d85b19..7464b28ee9 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -238,7 +238,7 @@ public class DataTypeUtils { public static UUID toUUID(Object value) { if (value == null) { - throw new IllegalTypeConversionException("Null values cannot be converted to a UUID"); + return null; } if (value instanceof UUID) { diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java index 8854bf6e0c..c8ee7bd171 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java @@ -29,6 +29,7 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.LocalTime; @@ -91,7 +92,8 @@ public class GenericDataConverters { @Override public LocalTime convert(Object data) { - return DataTypeUtils.toTime(data, () -> DataTypeUtils.getDateFormat(timeFormat), null).toLocalTime(); + Time time = DataTypeUtils.toTime(data, () -> DataTypeUtils.getDateFormat(timeFormat), null); + return time == null ? null : time.toLocalTime(); } } @@ -106,7 +108,7 @@ public class GenericDataConverters { @Override public LocalDateTime convert(Object data) { final Timestamp convertedTimestamp = DataTypeUtils.toTimestamp(data, () -> DataTypeUtils.getDateFormat(dataType.getFormat()), null); - return convertedTimestamp.toLocalDateTime(); + return convertedTimestamp == null ? null : convertedTimestamp.toLocalDateTime(); } } @@ -121,7 +123,7 @@ public class GenericDataConverters { @Override public OffsetDateTime convert(Object data) { final Timestamp convertedTimestamp = DataTypeUtils.toTimestamp(data, () -> DataTypeUtils.getDateFormat(dataType.getFormat()), null); - return OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), ZoneId.of("UTC")); + return convertedTimestamp == null ? null : OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), ZoneId.of("UTC")); } } @@ -129,6 +131,9 @@ public class GenericDataConverters { @Override public byte[] convert(Object data) { + if (data == null) { + return null; + } final UUID uuid = DataTypeUtils.toUUID(data); ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); byteBuffer.putLong(uuid.getMostSignificantBits()); @@ -147,6 +152,9 @@ public class GenericDataConverters { @Override public byte[] convert(Byte[] data) { + if (data == null) { + return null; + } Validate.isTrue(data.length == length, String.format("Cannot write byte array of length %s as fixed[%s]", data.length, length)); return ArrayUtils.toPrimitive(data); } @@ -156,6 +164,9 @@ public class GenericDataConverters { @Override public ByteBuffer convert(Byte[] data) { + if (data == null) { + return null; + } return ByteBuffer.wrap(ArrayUtils.toPrimitive(data)); } } @@ -171,6 +182,9 @@ public class GenericDataConverters { @Override public BigDecimal convert(Object data) { + if (data == null) { + return null; + } if (data instanceof BigDecimal) { BigDecimal bigDecimal = (BigDecimal) data; Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, bigDecimal.scale(), data); @@ -194,6 +208,9 @@ public class GenericDataConverters { @Override @SuppressWarnings("unchecked") public List<T> convert(S[] data) { + if (data == null) { + return null; + } final int numElements = data.length; final List<T> result = new ArrayList<>(numElements); for (int i = 0; i < numElements; i += 1) { @@ -219,6 +236,9 @@ public class GenericDataConverters { @Override @SuppressWarnings("unchecked") public Map<TK, TV> convert(Map<SK, SV> data) { + if (data == null) { + return null; + } final int mapSize = data.size(); final Object[] keyArray = data.keySet().toArray(); final Object[] valueArray = data.values().toArray(); @@ -253,6 +273,9 @@ public class GenericDataConverters { @Override public GenericRecord convert(Record data) { + if (data == null) { + return null; + } final GenericRecord record = GenericRecord.create(schema); for (DataConverter<?, ?> converter : converters) { diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java index 18690ffe78..baf220fea2 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java @@ -83,6 +83,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.condition.OS.WINDOWS; @@ -471,6 +472,33 @@ public class TestIcebergRecordConverter { } else { assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class)); } + + // Test null values + for (String fieldName : record.getRawFieldNames()) { + record.setValue(fieldName, null); + } + + genericRecord = recordConverter.convert(record); + + writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile); + + results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile()); + + assertEquals(results.size(), 1); + resultRecord = results.get(0); + assertNull(resultRecord.get(0, String.class)); + assertNull(resultRecord.get(1, Integer.class)); + assertNull(resultRecord.get(2, Float.class)); + assertNull(resultRecord.get(3, Long.class)); + assertNull(resultRecord.get(4, Double.class)); + assertNull(resultRecord.get(5, BigDecimal.class)); + assertNull(resultRecord.get(6, Boolean.class)); + assertNull(resultRecord.get(7)); + assertNull(resultRecord.get(8)); + assertNull(resultRecord.get(9, LocalDate.class)); + assertNull(resultRecord.get(10, LocalTime.class)); + assertNull(resultRecord.get(11, OffsetDateTime.class)); + assertNull(resultRecord.get(14, Integer.class)); } @DisabledOnOs(WINDOWS)