This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 4636d9c79da053da178b26709e155db0f5680051 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Thu Jun 20 11:37:15 2024 -0400 NIFI-13423 Corrected Starting Field handling in JsonTreeReader Ensure that when using a Starting Field Name, JsonTreeReader is still able to process all records This closes #8989 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../nifi/json/AbstractJsonRowRecordReader.java | 153 ++++++++-------- .../nifi/json/TestJsonTreeRowRecordReader.java | 200 ++++++++++++--------- 2 files changed, 198 insertions(+), 155 deletions(-) diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index c43cfd190f..d213e6ab36 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -83,21 +83,16 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { private final String dateFormat; private final String timeFormat; private final String timestampFormat; + private final String nestedFieldName; - private boolean firstObjectConsumed = false; - private JsonParser jsonParser; - private JsonNode firstJsonNode; - private StartingFieldStrategy strategy; - private Map<String, String> capturedFields; - private BiPredicate<String, String> captureFieldPredicate; + private final JsonParser jsonParser; + private final StartingFieldStrategy strategy; + private final Map<String, String> capturedFields; + private final BiPredicate<String, String> captureFieldPredicate; - private AbstractJsonRowRecordReader(final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) { - this.logger = logger; + // Keeps track of whether or not we've skipped to the starting field for the current object when using the NESTED_FIELD strategy + private boolean skippedToStartField = false; - this.dateFormat = dateFormat; - this.timeFormat = timeFormat; - this.timestampFormat = timestampFormat; - } /** * Constructor with initial logic for JSON to NiFi record parsing. @@ -130,7 +125,12 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { final TokenParserFactory tokenParserFactory) throws IOException, MalformedRecordException { - this(logger, dateFormat, timeFormat, timestampFormat); + this.logger = logger; + + this.dateFormat = dateFormat; + this.timeFormat = timeFormat; + this.timestampFormat = timestampFormat; + this.nestedFieldName = nestedFieldName; this.strategy = strategy; this.captureFieldPredicate = captureFieldPredicate; @@ -141,28 +141,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { jsonParser = tokenParserFactory.getJsonParser(in, configuredStreamReadConstraints, allowComments); jsonParser.enable(Feature.USE_FAST_DOUBLE_PARSER); jsonParser.enable(Feature.USE_FAST_BIG_NUMBER_PARSER); - - if (strategy == StartingFieldStrategy.NESTED_FIELD) { - while (jsonParser.nextToken() != null) { - if (nestedFieldName.equals(jsonParser.currentName())) { - logger.debug("Parsing starting at nested field [{}]", nestedFieldName); - break; - } - if (captureFieldPredicate != null) { - captureCurrentField(captureFieldPredicate); - } - } - } - - JsonToken token = jsonParser.nextToken(); - if (token == JsonToken.START_ARRAY) { - token = jsonParser.nextToken(); // advance to START_OBJECT token - } - if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also - firstJsonNode = jsonParser.readValueAsTree(); - } else { - firstJsonNode = null; - } } catch (final JsonParseException e) { throw new MalformedRecordException("Could not parse data as JSON", e); } @@ -190,6 +168,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { captureCurrentField(captureFieldPredicate); } } + return null; } @@ -302,7 +281,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { return null; } - private void captureCurrentField(BiPredicate<String, String> captureFieldPredicate) throws IOException { + private void captureCurrentField(final BiPredicate<String, String> captureFieldPredicate) throws IOException { + if (captureFieldPredicate == null) { + return; + } + if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME) { jsonParser.nextToken(); @@ -402,57 +385,79 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { return new MapRecord(childSchema, childValues, serializedForm); } - protected JsonNode getNextJsonNode() throws IOException, MalformedRecordException { - if (!firstObjectConsumed) { - firstObjectConsumed = true; - return firstJsonNode; - } - if (strategy == StartingFieldStrategy.NESTED_FIELD) { - return getJsonNodeWithNestedNodeStrategy(); - } else { - return getJsonNode(); - } - } + private JsonNode getNextJsonNode() throws IOException, MalformedRecordException { + try { + while (true) { + final JsonToken token = jsonParser.nextToken(); + if (token == null) { + return null; + } - private JsonNode getJsonNodeWithNestedNodeStrategy() throws IOException, MalformedRecordException { - while (true) { - final JsonToken token = jsonParser.nextToken(); - if (token == null) { - return null; + switch (token) { + case START_ARRAY: + break; + case END_ARRAY: + case END_OBJECT: + skippedToStartField = false; + break; + case START_OBJECT: + if (strategy == StartingFieldStrategy.NESTED_FIELD) { + if (!skippedToStartField) { + skipToStartingField(); + skippedToStartField = true; + break; + } + } + + return jsonParser.readValueAsTree(); + default: + // We got a token that isn't expected. This can happen when using the Nested Field Strategy. + // For example, the field given has a String as a value instead of a Record. In this case, we want to skip to the next field. + // Read to the end of the object/array + skipToEndOfObject(); + break; + } } + } catch (final JsonParseException e) { + throw new MalformedRecordException("Failed to parse JSON", e); + } + } + + private void skipToEndOfObject() throws IOException { + int depth = 0; + JsonToken token; + while ((token = jsonParser.nextToken()) != null) { + captureCurrentField(captureFieldPredicate); switch (token) { - case START_ARRAY: + case START_OBJECT: + depth++; break; - case END_ARRAY: case END_OBJECT: - case FIELD_NAME: - return null; - case START_OBJECT: - return jsonParser.readValueAsTree(); + depth--; + if (depth == 0) { + return; + } + break; default: - throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name()); + break; } } } - private JsonNode getJsonNode() throws IOException, MalformedRecordException { - while (true) { - final JsonToken token = jsonParser.nextToken(); - if (token == null) { - return null; - } + private void skipToStartingField() throws IOException { + if (strategy != StartingFieldStrategy.NESTED_FIELD) { + return; + } - switch (token) { - case START_ARRAY: - case END_ARRAY: - case END_OBJECT: - break; - case START_OBJECT: - return jsonParser.readValueAsTree(); - default: - throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name()); + while (jsonParser.nextToken() != null) { + if (nestedFieldName.equals(jsonParser.currentName())) { + logger.debug("Parsing starting at nested field [{}]", nestedFieldName); + break; + } + if (captureFieldPredicate != null) { + captureCurrentField(captureFieldPredicate); } } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index 57a399f5ea..8f51faf53d 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -318,7 +318,6 @@ class TestJsonTreeRowRecordReader { void testReadJSONDisallowComments() { final MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", false, StreamReadConstraints.builder().maxStringLength(20_000).build())); - assertTrue(mre.getMessage().contains("not parse")); } private void testReadAccountJson(final String inputFile, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws IOException, MalformedRecordException { @@ -713,6 +712,77 @@ class TestJsonTreeRowRecordReader { } } + @Test + public void testMultipleInputRecordsWithStartingFieldArray() throws IOException, MalformedRecordException { + final String inputJson = """ + [{ + "books": [{ + "id": 1, + "title": "Book 1" + }, { + "id": 2, + "title": "Book 2" + }] + }, { + "books": [{ + "id": 3, + "title": "Book 3" + }, { + "id": 4, + "title": "Book 4" + }] + }]"""; + + final RecordSchema bookSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("id", RecordFieldType.INT.getDataType()), + new RecordField("title", RecordFieldType.STRING.getDataType()) + )); + + final List<String> ids = new ArrayList<>(); + try (final InputStream in = new ByteArrayInputStream(inputJson.getBytes(StandardCharsets.UTF_8)); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), bookSchema, dateFormat, timeFormat, timestampFormat, + StartingFieldStrategy.NESTED_FIELD, "books", SchemaApplicationStrategy.SELECTED_PART, null)) { + + Record record; + while ((record = reader.nextRecord()) != null) { + final String id = record.getAsString("id"); + ids.add(id); + } + } + + assertEquals(List.of("1", "2", "3", "4"), ids); + } + + @Test + public void testMultipleInputRecordsWithStartingFieldSingleObject() throws IOException, MalformedRecordException { + final String inputJson = """ + {"book": {"id": 1,"title": "Book 1"}} + {"book": {"id": 2,"title": "Book 2"}} + {"book": {"id": 3,"title": "Book 3"}} + {"book": {"id": 4,"title": "Book 4"}} + """; + + final RecordSchema bookSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("id", RecordFieldType.INT.getDataType()), + new RecordField("title", RecordFieldType.STRING.getDataType()) + )); + + final List<String> ids = new ArrayList<>(); + try (final InputStream in = new ByteArrayInputStream(inputJson.getBytes(StandardCharsets.UTF_8)); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), bookSchema, dateFormat, timeFormat, timestampFormat, + StartingFieldStrategy.NESTED_FIELD, "book", SchemaApplicationStrategy.SELECTED_PART, null)) { + + Record record; + while ((record = reader.nextRecord()) != null) { + final String id = record.getAsString("id"); + ids.add(id); + } + } + + assertEquals(List.of("1", "2", "3", "4"), ids); + } + + @Test void testReadUnicodeCharacters() throws IOException, MalformedRecordException { @@ -884,35 +954,35 @@ class TestJsonTreeRowRecordReader { // so we take the first one (INT, BOOLEAN) - as best effort - for both cases SimpleRecordSchema expectedSelectedRecordSchemaForRecordsInBothArrays = expectedChildSchema1; - List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>() {{ - put("record", new Object[]{ - new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{ - put("integer", 11); - put("boolean", true); - put("extraString", "extraStringValue11"); - }}), - new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{ - put("integer", 12); - put("boolean", false); - put("extraString", "extraStringValue12"); - }}) - }); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>() {{ - put("record", new Object[]{ - new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{ - put("integer", 21); - put("extraString", "extraStringValue21"); - put("string", "stringValue21"); - }}), - new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{ - put("integer", 22); - put("extraString", "extraStringValue22"); - put("string", "stringValue22"); - }}) - }); - }}) + List<Object> expected = List.of( + new MapRecord(expectedRecordChoiceSchema, Map.of( + "record", new Object[] { + new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, Map.of( + "integer", 11, + "boolean", true, + "extraString", "extraStringValue11" + )), + new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, Map.of( + "integer", 12, + "boolean", false, + "extraString", "extraStringValue12" + )) + } + )), + new MapRecord(expectedRecordChoiceSchema, Map.of( + "record", new Object[] { + new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, Map.of( + "integer", 21, + "extraString", "extraStringValue21", + "string", "stringValue21" + )), + new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, Map.of( + "integer", 22, + "extraString", "extraStringValue22", + "string", "stringValue22" + )) + } + )) ); testReadRecords(jsonPath, schema, expected); @@ -927,15 +997,9 @@ class TestJsonTreeRowRecordReader { new RecordField("balance", RecordFieldType.DOUBLE.getDataType()) )); - List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", 42); - put("balance", 4750.89); - }}), - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", 43); - put("balance", 48212.38); - }}) + List<Object> expected = List.of( + new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 4750.89)), + new MapRecord(expectedRecordSchema, Map.of("id", 43, "balance", 48212.38)) ); testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "accounts"); @@ -950,12 +1014,7 @@ class TestJsonTreeRowRecordReader { new RecordField("balance", RecordFieldType.DOUBLE.getDataType()) )); - List<Object> expected = Collections.singletonList( - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", 42); - put("balance", 4750.89); - }}) - ); + List<Object> expected = List.of(new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 4750.89))); testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "account"); } @@ -969,15 +1028,9 @@ class TestJsonTreeRowRecordReader { new RecordField("type", RecordFieldType.STRING.getDataType()) )); - List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", "n312kj3"); - put("type", "employee"); - }}), - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", "dl2kdff"); - put("type", "security"); - }}) + List<Object> expected = List.of( + new MapRecord(expectedRecordSchema, Map.of("id", "n312kj3", "type", "employee")), + new MapRecord(expectedRecordSchema, Map.of("id", "dl2kdff", "type", "security")) ); testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "accountIds"); @@ -1005,20 +1058,14 @@ class TestJsonTreeRowRecordReader { void testStartFromNestedFieldThenStartObject() throws IOException, MalformedRecordException { String jsonPath = "src/test/resources/json/nested-array-then-start-object.json"; - SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( + final SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( new RecordField("id", RecordFieldType.INT.getDataType()), new RecordField("balance", RecordFieldType.DOUBLE.getDataType()) )); - List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", 42); - put("balance", 4750.89); - }}), - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", 43); - put("balance", 48212.38); - }}) + final List<Object> expected = List.of( + new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 4750.89)), + new MapRecord(expectedRecordSchema, Map.of("id", 43, "balance", 48212.38)) ); testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, @@ -1029,22 +1076,19 @@ class TestJsonTreeRowRecordReader { void testStartFromNestedObjectWithWholeJsonSchemaScope() throws IOException, MalformedRecordException { String jsonPath = "src/test/resources/json/single-element-nested.json"; - RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList( + final RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList( new RecordField("id", RecordFieldType.INT.getDataType()), new RecordField("balance", RecordFieldType.DOUBLE.getDataType()) )); - RecordSchema recordSchema = new SimpleRecordSchema(Collections.singletonList( + final RecordSchema recordSchema = new SimpleRecordSchema(Collections.singletonList( new RecordField("account", RecordFieldType.RECORD.getRecordDataType(accountSchema)) )); - RecordSchema expectedRecordSchema = accountSchema; + final RecordSchema expectedRecordSchema = accountSchema; - List<Object> expected = Collections.singletonList( - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", 42); - put("balance", 4750.89); - }}) + final List<Object> expected = List.of( + new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 4750.89)) ); testReadRecords(jsonPath, recordSchema, expected, StartingFieldStrategy.NESTED_FIELD, @@ -1067,14 +1111,8 @@ class TestJsonTreeRowRecordReader { RecordSchema expectedRecordSchema = accountSchema; List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", 42); - put("balance", 4750.89); - }}), - new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{ - put("id", 43); - put("balance", 48212.38); - }}) + new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 4750.89)), + new MapRecord(expectedRecordSchema, Map.of("id", 43, "balance", 48212.38)) ); testReadRecords(jsonPath, recordSchema, expected, StartingFieldStrategy.NESTED_FIELD, @@ -1264,7 +1302,7 @@ class TestJsonTreeRowRecordReader { List<EqualsWrapper<Object>> wrappedExpected = EqualsWrapper.wrapList(expected, propertyProviders); List<EqualsWrapper<Object>> wrappedActual = EqualsWrapper.wrapList(actual, propertyProviders); - assertEquals(wrappedExpected, wrappedActual); + assertEquals(wrappedExpected, wrappedActual, "Expected: " + expected + ", Actual: " + actual); } }