This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit f91311da9d759fb5712ee8121d54d27a7deea236 Author: Matthew Burgess <mattyb...@apache.org> AuthorDate: Wed Mar 6 09:53:13 2019 -0500 NIFI-6105: Fix handling of arrays of records/maps in record utilities This closes #3353. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../serialization/record/util/DataTypeUtils.java | 27 +++++++++++++++++ .../serialization/record/TestDataTypeUtils.java | 35 ++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index a399f67..fb6cdbd 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -204,6 +204,30 @@ public class DataTypeUtils { if (value == null) { return false; } + + // value may be a Map even when type is RECORD + if (value instanceof Map) { + final RecordSchema schema = ((RecordDataType) dataType).getChildSchema(); + if (schema == null) { + return true; + } + Map<String, Object> record = ((Map<String, Object>) value); + for (final RecordField childField : schema.getFields()) { + final Object childValue = record.get(childField.getFieldName()); + if (childValue == null && !childField.isNullable()) { + logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName()); + return false; + } + if (childValue == null) { + continue; // consider compatible + } + + if (!isCompatibleDataType(childValue, childField.getDataType())) { + return false; + } + } + return true; + } if (!(value instanceof Record)) { return false; } @@ -687,6 +711,9 @@ public class DataTypeUtils { return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType()); } else if (dataType != null && isScalarValue(dataType, value)) { return value; + } else if (value instanceof Object[] && dataType instanceof ArrayDataType) { + // This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object + return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType()); } throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported"); diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java index 45b65b4..cef0eec 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java @@ -108,6 +108,34 @@ public class TestDataTypeUtils { } @Test + public void testConvertArrayOfRecordsToJavaArray() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("stringField", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("intField", RecordFieldType.INT.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values1 = new HashMap<>(); + values1.put("stringField", "hello"); + values1.put("intField", 5); + final Record inputRecord1 = new MapRecord(schema, values1); + + final Map<String, Object> values2 = new HashMap<>(); + values2.put("stringField", "world"); + values2.put("intField", 50); + final Record inputRecord2 = new MapRecord(schema, values2); + + Object[] recordArray = {inputRecord1, inputRecord2}; + Object resultObj = DataTypeUtils.convertRecordFieldtoObject(recordArray, RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(schema))); + assertNotNull(resultObj); + assertTrue(resultObj instanceof Object[]); + Object[] resultArray = (Object[]) resultObj; + for(Object o : resultArray) { + assertTrue(o instanceof Map); + } + } + + @Test @SuppressWarnings("unchecked") public void testConvertRecordFieldToObject() { assertNull(DataTypeUtils.convertRecordFieldtoObject(null, null)); @@ -252,4 +280,11 @@ public class TestDataTypeUtils { } } } + + @Test + public void testIsCompatibleDataTypeMap() { + Map<String,Object> testMap = new HashMap<>(); + testMap.put("Hello", "World"); + assertTrue(DataTypeUtils.isCompatibleDataType(testMap, RecordFieldType.RECORD.getDataType())); + } }