This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new dcc0e5edb7 NIFI-12847: Add Enum data type handling to Iceberg record 
converter
dcc0e5edb7 is described below

commit dcc0e5edb78f924776003f3d1187e83b2ec616b1
Author: Mark Bathori <mbath...@apache.org>
AuthorDate: Tue Feb 27 13:47:19 2024 +0100

    NIFI-12847: Add Enum data type handling to Iceberg record converter
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8453.
---
 .../serialization/record/util/DataTypeUtils.java   |  2 +-
 .../iceberg/converter/ArrayElementGetter.java      |  4 +++
 .../iceberg/converter/RecordFieldGetter.java       |  4 +++
 .../iceberg/TestIcebergRecordConverter.java        |  6 ++++-
 .../iceberg/TestPutIcebergWithHiveCatalog.java     |  2 +-
 .../src/test/resources/user.avsc                   | 29 ++++++++++++++++------
 6 files changed, 36 insertions(+), 11 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 1d249eebd8..6cb46ce799 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1089,7 +1089,7 @@ public class DataTypeUtils {
         return enumType.getEnums() != null && 
enumType.getEnums().contains(value);
     }
 
-    private static Object toEnum(Object value, EnumDataType dataType, String 
fieldName) {
+    public static Object toEnum(Object value, EnumDataType dataType, String 
fieldName) {
         if(dataType.getEnums() != null && dataType.getEnums().contains(value)) 
{
             return value.toString();
         }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
index fda35a1e9e..1b8f7f18f0 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.iceberg.converter;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.EnumDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
@@ -82,6 +83,9 @@ public class ArrayElementGetter {
             case TIMESTAMP:
                 elementGetter = element -> DataTypeUtils.toTimestamp(element, 
() -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
                 break;
+            case ENUM:
+                elementGetter = element -> DataTypeUtils.toEnum(element, 
(EnumDataType) dataType, ARRAY_FIELD_NAME);
+                break;
             case UUID:
                 elementGetter = DataTypeUtils::toUUID;
                 break;
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
index 24a21d6ef7..a49530a124 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java
@@ -21,6 +21,7 @@ import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.EnumDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@@ -87,6 +88,9 @@ public class RecordFieldGetter {
             case UUID:
                 fieldGetter = record -> 
DataTypeUtils.toUUID(record.getValue(fieldName));
                 break;
+            case ENUM:
+                fieldGetter = record -> 
DataTypeUtils.toEnum(record.getValue(fieldName), (EnumDataType) dataType, 
fieldName);
+                break;
             case ARRAY:
                 fieldGetter = record -> 
DataTypeUtils.toArray(record.getValue(fieldName), fieldName, ((ArrayDataType) 
dataType).getElementType());
                 break;
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
index e72db3a4b3..fdc9b0fa4e 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java
@@ -166,7 +166,8 @@ public class TestIcebergRecordConverter {
             Types.NestedField.optional(11, "timestamp", 
Types.TimestampType.withZone()),
             Types.NestedField.optional(12, "timestampTz", 
Types.TimestampType.withoutZone()),
             Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
-            Types.NestedField.optional(14, "choice", Types.IntegerType.get())
+            Types.NestedField.optional(14, "choice", Types.IntegerType.get()),
+            Types.NestedField.optional(15, "enum", Types.StringType.get())
     );
 
     private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new 
Schema(
@@ -294,6 +295,7 @@ public class TestIcebergRecordConverter {
         fields.add(new RecordField("timestampTz", 
RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("uuid", 
RecordFieldType.UUID.getDataType()));
         fields.add(new RecordField("choice", 
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), 
RecordFieldType.INT.getDataType())));
+        fields.add(new RecordField("enum", 
RecordFieldType.ENUM.getEnumDataType(Arrays.asList("red", "blue", "yellow"))));
 
         return new SimpleRecordSchema(fields);
     }
@@ -470,6 +472,7 @@ public class TestIcebergRecordConverter {
         values.put("timestampTz", Timestamp.valueOf(localDateTime));
         values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
         values.put("choice", "10");
+        values.put("enum", "blue");
 
         return new MapRecord(getPrimitivesSchema(), values);
     }
@@ -593,6 +596,7 @@ public class TestIcebergRecordConverter {
         assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), 
resultRecord.get(11, OffsetDateTime.class));
         assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(12, LocalDateTime.class));
         assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
+        assertEquals("blue", resultRecord.get(15, String.class));
 
         if (format.equals(PARQUET)) {
             assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0}, resultRecord.get(13, byte[].class));
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index 0a0cd8ef96..b47a3cc6b5 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -109,7 +109,7 @@ public class TestPutIcebergWithHiveCatalog {
         RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema);
 
         for (RecordField recordField : recordSchema.getFields()) {
-            readerFactory.addSchemaField(recordField.getFieldName(), 
recordField.getDataType().getFieldType(), recordField.isNullable());
+            readerFactory.addSchemaField(recordField);
         }
 
         readerFactory.addRecord(0, "John", "Finance");
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
index c537a9e496..799c0023b4 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc
@@ -15,12 +15,25 @@
  * limitations under the License.
  */
 {
- "namespace": "nifi",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "id",  "type": ["long", "null"]},
-     {"name": "name", "type": ["string", "null"]},
-     {"name": "department", "type": ["string", "null"]}
- ]
+  "namespace": "nifi",
+  "type": "record",
+  "name": "User",
+  "fields": [
+    {
+      "name": "id",
+      "type": ["long", "null"]
+    },
+    {
+      "name": "name",
+      "type": ["string", "null"]
+    },
+    {
+      "name": "department",
+      "type": {
+        "name": "Department",
+        "type": "enum",
+        "symbols": ["Finance", "Marketing", "Sales"]
+      }
+    }
+  ]
 }

Reply via email to