markap14 commented on code in PR #7745: URL: https://github.com/apache/nifi/pull/7745#discussion_r1417519143
########## nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java: ########## @@ -1837,7 +1838,41 @@ public void testUnescapeJson() { put("firstName", "John"); put("age", 30); }}, "json_str"), - RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + + // test nested Record converted from Map Object + final Record nestedRecordFromMap = new MapRecord(schema, + Collections.singletonMap( + "json_str", + "{\"firstName\":\"John\",\"age\":30,\"addresses\":[{\"address_1\":\"123 Fake Street\"}]}") + ); + // recursively convert Maps to Records (addresses becomes and ARRAY or RECORDs) + assertEquals( + DataTypeUtils.toRecord(new LinkedHashMap<String, Object>(){{ + put("firstName", "John"); + put("age", 30); + put("addresses", new Object[] {DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")}); + }}, "json_str"), Review Comment: We should be creating subclasses of LinkedHashMap for initialization purposes. Should just declare a LinkedHashMap and add elements to it: ``` Map<String, Object> values = new LinkedHashMap<>(); values.put("firstName", "John"); values.put("age", 30); values.put("addresses", new Object[] {DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")}); ``` ########## nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java: ########## @@ -1837,7 +1838,41 @@ public void testUnescapeJson() { put("firstName", "John"); put("age", 30); }}, "json_str"), - RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + + // test nested Record converted from Map Object + final Record nestedRecordFromMap = new MapRecord(schema, + Collections.singletonMap( + "json_str", + "{\"firstName\":\"John\",\"age\":30,\"addresses\":[{\"address_1\":\"123 Fake Street\"}]}") Review Comment: Would probably be cleaner here to use a text block here: ``` """ {"firstName":"John","age":30,"address"...}""" ``` ########## nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java: ########## @@ -469,21 +473,51 @@ public static RecordSchema inferSchema(final Map<String, Object> values, final S final RecordField recordField = new RecordField(key, inferredDataType, true); inferredFieldTypes.add(recordField); - final Object coercedValue = convertType(rawValue, inferredDataType, fieldName, charset); - coercedValues.put(key, coercedValue); + convertType(rawValue, inferredDataType, fieldName, charset); } - final RecordSchema inferredSchema = new SimpleRecordSchema(inferredFieldTypes); - return inferredSchema; + return new SimpleRecordSchema(inferredFieldTypes); } public static Record toRecord(final Object value, final String fieldName, final Charset charset) { + return toRecord(value, fieldName, charset, false); + } + + private static Object covertObjectToRecord(final Object rawValue, final String key, final Charset charset) { Review Comment: typo - name is `covert...` instead of `convert...` But we should not have a method named `convertObjectToRecord` that returns an Object that may be an array, may be a Record, or may be any other object that was provided. Need to either always return a Record or create a method name that reflects what is truly done. ########## nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java: ########## @@ -1837,7 +1838,41 @@ public void testUnescapeJson() { put("firstName", "John"); put("age", 30); }}, "json_str"), - RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + + // test nested Record converted from Map Object + final Record nestedRecordFromMap = new MapRecord(schema, + Collections.singletonMap( + "json_str", + "{\"firstName\":\"John\",\"age\":30,\"addresses\":[{\"address_1\":\"123 Fake Street\"}]}") + ); + // recursively convert Maps to Records (addresses becomes and ARRAY or RECORDs) + assertEquals( + DataTypeUtils.toRecord(new LinkedHashMap<String, Object>(){{ + put("firstName", "John"); + put("age", 30); + put("addresses", new Object[] {DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")}); + }}, "json_str"), + RecordPath.compile("unescapeJson(/json_str, 'true', 'true')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + // convert Map to Record, without recursion (addresses becomes an ARRAY, but contents are still Maps) + assertEquals( + DataTypeUtils.toRecord(new LinkedHashMap<String, Object>(){{ + put("firstName", "John"); + put("age", 30); + put("addresses", new Object[] {Collections.singletonMap("address_1", "123 Fake Street")}); + }}, "json_str"), + RecordPath.compile("unescapeJson(/json_str, 'true', 'false')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + // without Map conversion to Record (addresses remains a Collection, Maps are unchanged) + assertEquals( + new LinkedHashMap<String, Object>(){{ + put("firstName", "John"); + put("age", 30); + put("addresses", Collections.singletonList(Collections.singletonMap("address_1", "123 Fake Street"))); + }}, Review Comment: As well as here ########## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java: ########## @@ -468,6 +470,67 @@ public void testSetRootWithUnescapeJsonCall() throws InitializationException, IO runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); } + @Test + public void testSetNestedRootWithUnescapeJsonCall() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + final Path input = Paths.get("src/test/resources/TestUpdateRecord/input/organisation.json"); + + // recursive conversion + runner.enqueue(input); + runner.setProperty("/", "unescapeJson(/record_json, 'true', 'true')"); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/organisation.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + assertTrue(runner.getLogger().getErrorMessages().isEmpty()); + + // no conversion + runner.clearTransferState(); + runner.enqueue(input); + runner.setProperty("/", "unescapeJson(/record_json)"); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + expectedOutput = "[ {\n" + + " \"name\" : null,\n" + + " \"departments\" : null,\n" + + " \"address\" : null\n" + + "} ]"; Review Comment: This should use a Text Block ########## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java: ########## @@ -468,6 +470,67 @@ public void testSetRootWithUnescapeJsonCall() throws InitializationException, IO runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); } + @Test + public void testSetNestedRootWithUnescapeJsonCall() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + final Path input = Paths.get("src/test/resources/TestUpdateRecord/input/organisation.json"); + + // recursive conversion + runner.enqueue(input); + runner.setProperty("/", "unescapeJson(/record_json, 'true', 'true')"); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/organisation.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + assertTrue(runner.getLogger().getErrorMessages().isEmpty()); + + // no conversion + runner.clearTransferState(); + runner.enqueue(input); + runner.setProperty("/", "unescapeJson(/record_json)"); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + expectedOutput = "[ {\n" + + " \"name\" : null,\n" + + " \"departments\" : null,\n" + + " \"address\" : null\n" + + "} ]"; + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + assertTrue(runner.getLogger().getErrorMessages().isEmpty()); + + // non-recursive conversion + runner.clearTransferState(); + runner.enqueue(input); + runner.setProperty("/", "unescapeJson(/record_json, 'true')"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_FAILURE, 1); + final LogMessage errorMessage = runner.getLogger().getErrorMessages().get(0); + assertTrue(errorMessage.getMsg().contains("java.lang.ClassCastException: " + + "class java.util.LinkedHashMap cannot be cast to class org.apache.nifi.serialization.record.Record")); Review Comment: Checking for the text `ClassCastException` seems fair, but the text here is overly prescriptive. It could change at any time, and that should not cause failures of unit tests. ########## nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java: ########## @@ -469,21 +473,51 @@ public static RecordSchema inferSchema(final Map<String, Object> values, final S final RecordField recordField = new RecordField(key, inferredDataType, true); inferredFieldTypes.add(recordField); - final Object coercedValue = convertType(rawValue, inferredDataType, fieldName, charset); - coercedValues.put(key, coercedValue); + convertType(rawValue, inferredDataType, fieldName, charset); } - final RecordSchema inferredSchema = new SimpleRecordSchema(inferredFieldTypes); - return inferredSchema; + return new SimpleRecordSchema(inferredFieldTypes); } public static Record toRecord(final Object value, final String fieldName, final Charset charset) { + return toRecord(value, fieldName, charset, false); + } + + private static Object covertObjectToRecord(final Object rawValue, final String key, final Charset charset) { + final Object coercedValue; + if (rawValue instanceof Map<?, ?>) { + coercedValue = toRecord(rawValue, key, charset, true); + } else if (rawValue instanceof Object[]) { + final Object[] objArray = (Object[]) rawValue; + coercedValue = Arrays.stream(objArray).noneMatch(o -> o instanceof Map<?, ?>) + ? objArray + : Arrays.stream(objArray).map(o -> toRecord(o, key, charset, true)).toArray(); + } else if (rawValue instanceof Collection<?>) { + final Collection<?> objCollection = (Collection<?>) rawValue; + // Records have ARRAY DataTypes, so convert any Collections + coercedValue = objCollection.stream().noneMatch(o -> o instanceof Map<?, ?>) + ? objCollection.toArray() + : objCollection.stream().map(o -> toRecord(o, key, charset, true)).toArray(); Review Comment: We should avoid creating a `Stream` in this part of the codebase, as it is very sensitive to performance. ########## nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java: ########## @@ -802,13 +838,7 @@ public static Map<String, Object> toMap(final Object value, final String fieldNa if (value instanceof Map) { final Map<?, ?> original = (Map<?, ?>) value; - boolean keysAreStrings = true; - for (final Object key : original.keySet()) { - if (!(key instanceof String)) { - keysAreStrings = false; - break; - } - } + boolean keysAreStrings = original.keySet().stream().allMatch(key -> key instanceof String); Review Comment: This should be reverted back. `DataTypeUtils` is extremely sensitive to performance as this method could potentially be called 100 times or more for a single Record, so we should not use `Stream` here - creation of a `Stream` has important performance impacts and should be reserved for non-performance-critical parts of the codebase. ########## nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java: ########## @@ -1837,7 +1838,41 @@ public void testUnescapeJson() { put("firstName", "John"); put("age", 30); }}, "json_str"), - RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + + // test nested Record converted from Map Object + final Record nestedRecordFromMap = new MapRecord(schema, + Collections.singletonMap( + "json_str", + "{\"firstName\":\"John\",\"age\":30,\"addresses\":[{\"address_1\":\"123 Fake Street\"}]}") + ); + // recursively convert Maps to Records (addresses becomes and ARRAY or RECORDs) + assertEquals( + DataTypeUtils.toRecord(new LinkedHashMap<String, Object>(){{ + put("firstName", "John"); + put("age", 30); + put("addresses", new Object[] {DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")}); + }}, "json_str"), + RecordPath.compile("unescapeJson(/json_str, 'true', 'true')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + // convert Map to Record, without recursion (addresses becomes an ARRAY, but contents are still Maps) + assertEquals( + DataTypeUtils.toRecord(new LinkedHashMap<String, Object>(){{ + put("firstName", "John"); + put("age", 30); + put("addresses", new Object[] {Collections.singletonMap("address_1", "123 Fake Street")}); + }}, "json_str"), + RecordPath.compile("unescapeJson(/json_str, 'true', 'false')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() Review Comment: Likewise here, should not be subclasses LInkedHashMap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org