This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2846d3c3c6e454f7960763bd69e36e879684a033 Author: Koji Kawamura <ijokaruma...@apache.org> AuthorDate: Wed Mar 13 14:34:16 2019 +0900 NIFI-6105: Fix handling of arrays of records/maps in record utilities Refactored to use the same check logic for Record and Map types Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3367 --- .../serialization/record/util/DataTypeUtils.java | 96 ++++++++++------------ 1 file changed, 42 insertions(+), 54 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index fb6cdbd..63db142 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -201,59 +201,8 @@ public class DataTypeUtils { case LONG: return isLongTypeCompatible(value); case RECORD: { - if (value == null) { - return false; - } - - // value may be a Map even when type is RECORD - if (value instanceof Map) { - final RecordSchema schema = ((RecordDataType) dataType).getChildSchema(); - if (schema == null) { - return true; - } - Map<String, Object> record = ((Map<String, Object>) value); - for (final RecordField childField : schema.getFields()) { - final Object childValue = record.get(childField.getFieldName()); - if (childValue == null && !childField.isNullable()) { - logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName()); - return false; - } - if (childValue == null) { - continue; // consider compatible - } - - if (!isCompatibleDataType(childValue, childField.getDataType())) { - return false; - } - } - return true; - } - if (!(value instanceof Record)) { - return false; - } - final RecordSchema schema = ((RecordDataType) dataType).getChildSchema(); - if (schema == null) { - return true; - } - - final Record record = (Record) value; - for (final RecordField childField : schema.getFields()) { - final Object childValue = record.getValue(childField); - if (childValue == null && !childField.isNullable()) { - logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName()); - return false; - } - if (childValue == null) { - continue; // consider compatible - } - - if (!isCompatibleDataType(childValue, childField.getDataType())) { - return false; - } - } - - return true; + return isRecordTypeCompatible(schema, value); } case SHORT: return isShortTypeCompatible(value); @@ -539,8 +488,47 @@ public class DataTypeUtils { return RecordFieldType.RECORD.getRecordDataType(schema); } - public static boolean isRecordTypeCompatible(final Object value) { - return value instanceof Record; + /** + * Check if the given record structured object compatible with the schema. + * @param schema record schema, schema validation will not be performed if schema is null + * @param value the record structured object, i.e. Record or Map + * @return True if the object is compatible with the schema + */ + private static boolean isRecordTypeCompatible(RecordSchema schema, Object value) { + + if (value == null) { + return false; + } + + if (!(value instanceof Record) && !(value instanceof Map)) { + return false; + } + + if (schema == null) { + return true; + } + + for (final RecordField childField : schema.getFields()) { + final Object childValue; + if (value instanceof Record) { + childValue = ((Record) value).getValue(childField); + } else { + childValue = ((Map) value).get(childField.getFieldName()); + } + + if (childValue == null && !childField.isNullable()) { + logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName()); + return false; + } + if (childValue == null) { + continue; // consider compatible + } + + if (!isCompatibleDataType(childValue, childField.getDataType())) { + return false; + } + } + return true; } public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType) {