This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 4636d9c79da053da178b26709e155db0f5680051
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Thu Jun 20 11:37:15 2024 -0400

    NIFI-13423 Corrected Starting Field handling in JsonTreeReader
    
    Ensure that when using a Starting Field Name, JsonTreeReader is still able 
to process all records
    
    This closes #8989
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../nifi/json/AbstractJsonRowRecordReader.java     | 153 ++++++++--------
 .../nifi/json/TestJsonTreeRowRecordReader.java     | 200 ++++++++++++---------
 2 files changed, 198 insertions(+), 155 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index c43cfd190f..d213e6ab36 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -83,21 +83,16 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
     private final String dateFormat;
     private final String timeFormat;
     private final String timestampFormat;
+    private final String nestedFieldName;
 
-    private boolean firstObjectConsumed = false;
-    private JsonParser jsonParser;
-    private JsonNode firstJsonNode;
-    private StartingFieldStrategy strategy;
-    private Map<String, String> capturedFields;
-    private BiPredicate<String, String> captureFieldPredicate;
+    private final JsonParser jsonParser;
+    private final StartingFieldStrategy strategy;
+    private final Map<String, String> capturedFields;
+    private final BiPredicate<String, String> captureFieldPredicate;
 
-    private AbstractJsonRowRecordReader(final ComponentLog logger, final 
String dateFormat, final String timeFormat, final String timestampFormat) {
-        this.logger = logger;
+    // Keeps track of whether or not we've skipped to the starting field for 
the current object when using the NESTED_FIELD strategy
+    private boolean skippedToStartField = false;
 
-        this.dateFormat = dateFormat;
-        this.timeFormat = timeFormat;
-        this.timestampFormat = timestampFormat;
-    }
 
     /**
      * Constructor with initial logic for JSON to NiFi record parsing.
@@ -130,7 +125,12 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
                                           final TokenParserFactory 
tokenParserFactory)
             throws IOException, MalformedRecordException {
 
-        this(logger, dateFormat, timeFormat, timestampFormat);
+        this.logger = logger;
+
+        this.dateFormat = dateFormat;
+        this.timeFormat = timeFormat;
+        this.timestampFormat = timestampFormat;
+        this.nestedFieldName = nestedFieldName;
 
         this.strategy = strategy;
         this.captureFieldPredicate = captureFieldPredicate;
@@ -141,28 +141,6 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
             jsonParser = tokenParserFactory.getJsonParser(in, 
configuredStreamReadConstraints, allowComments);
             jsonParser.enable(Feature.USE_FAST_DOUBLE_PARSER);
             jsonParser.enable(Feature.USE_FAST_BIG_NUMBER_PARSER);
-
-            if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-                while (jsonParser.nextToken() != null) {
-                    if (nestedFieldName.equals(jsonParser.currentName())) {
-                        logger.debug("Parsing starting at nested field [{}]", 
nestedFieldName);
-                        break;
-                    }
-                    if (captureFieldPredicate != null) {
-                        captureCurrentField(captureFieldPredicate);
-                    }
-                }
-            }
-
-            JsonToken token = jsonParser.nextToken();
-            if (token == JsonToken.START_ARRAY) {
-                token = jsonParser.nextToken(); // advance to START_OBJECT 
token
-            }
-            if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also
-                firstJsonNode = jsonParser.readValueAsTree();
-            } else {
-                firstJsonNode = null;
-            }
         } catch (final JsonParseException e) {
             throw new MalformedRecordException("Could not parse data as JSON", 
e);
         }
@@ -190,6 +168,7 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
                     captureCurrentField(captureFieldPredicate);
                 }
             }
+
             return null;
         }
 
@@ -302,7 +281,11 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         return null;
     }
 
-    private void captureCurrentField(BiPredicate<String, String> 
captureFieldPredicate) throws IOException {
+    private void captureCurrentField(final BiPredicate<String, String> 
captureFieldPredicate) throws IOException {
+        if (captureFieldPredicate == null) {
+            return;
+        }
+
         if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME) {
             jsonParser.nextToken();
 
@@ -402,57 +385,79 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
         return new MapRecord(childSchema, childValues, serializedForm);
     }
 
-    protected JsonNode getNextJsonNode() throws IOException, 
MalformedRecordException {
-        if (!firstObjectConsumed) {
-            firstObjectConsumed = true;
-            return firstJsonNode;
-        }
-        if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-            return getJsonNodeWithNestedNodeStrategy();
-        } else {
-            return getJsonNode();
-        }
 
-    }
+    private JsonNode getNextJsonNode() throws IOException, 
MalformedRecordException {
+        try {
+            while (true) {
+                final JsonToken token = jsonParser.nextToken();
+                if (token == null) {
+                    return null;
+                }
 
-    private JsonNode getJsonNodeWithNestedNodeStrategy() throws IOException, 
MalformedRecordException {
-        while (true) {
-            final JsonToken token = jsonParser.nextToken();
-            if (token == null) {
-                return null;
+                switch (token) {
+                    case START_ARRAY:
+                        break;
+                    case END_ARRAY:
+                    case END_OBJECT:
+                        skippedToStartField = false;
+                        break;
+                    case START_OBJECT:
+                        if (strategy == StartingFieldStrategy.NESTED_FIELD) {
+                            if (!skippedToStartField) {
+                                skipToStartingField();
+                                skippedToStartField = true;
+                                break;
+                            }
+                        }
+
+                        return jsonParser.readValueAsTree();
+                    default:
+                        // We got a token that isn't expected. This can happen 
when using the Nested Field Strategy.
+                        // For example, the field given has a String as a 
value instead of a Record. In this case, we want to skip to the next field.
+                        // Read to the end of the object/array
+                        skipToEndOfObject();
+                        break;
+                }
             }
+        } catch (final JsonParseException e) {
+            throw new MalformedRecordException("Failed to parse JSON", e);
+        }
+    }
+
+    private void skipToEndOfObject() throws IOException {
+        int depth = 0;
+        JsonToken token;
+        while ((token = jsonParser.nextToken()) != null) {
+            captureCurrentField(captureFieldPredicate);
 
             switch (token) {
-                case START_ARRAY:
+                case START_OBJECT:
+                    depth++;
                     break;
-                case END_ARRAY:
                 case END_OBJECT:
-                case FIELD_NAME:
-                    return null;
-                case START_OBJECT:
-                    return jsonParser.readValueAsTree();
+                    depth--;
+                    if (depth == 0) {
+                        return;
+                    }
+                    break;
                 default:
-                    throw new MalformedRecordException("Expected to get a JSON 
Object but got a token of type " + token.name());
+                    break;
             }
         }
     }
 
-    private JsonNode getJsonNode() throws IOException, 
MalformedRecordException {
-        while (true) {
-            final JsonToken token = jsonParser.nextToken();
-            if (token == null) {
-                return null;
-            }
+    private void skipToStartingField() throws IOException {
+        if (strategy != StartingFieldStrategy.NESTED_FIELD) {
+            return;
+        }
 
-            switch (token) {
-                case START_ARRAY:
-                case END_ARRAY:
-                case END_OBJECT:
-                    break;
-                case START_OBJECT:
-                    return jsonParser.readValueAsTree();
-                default:
-                    throw new MalformedRecordException("Expected to get a JSON 
Object but got a token of type " + token.name());
+        while (jsonParser.nextToken() != null) {
+            if (nestedFieldName.equals(jsonParser.currentName())) {
+                logger.debug("Parsing starting at nested field [{}]", 
nestedFieldName);
+                break;
+            }
+            if (captureFieldPredicate != null) {
+                captureCurrentField(captureFieldPredicate);
             }
         }
     }
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 57a399f5ea..8f51faf53d 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -318,7 +318,6 @@ class TestJsonTreeRowRecordReader {
     void testReadJSONDisallowComments() {
         final MalformedRecordException mre = 
assertThrows(MalformedRecordException.class, () ->
             
testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", 
false, StreamReadConstraints.builder().maxStringLength(20_000).build()));
-        assertTrue(mre.getMessage().contains("not parse"));
     }
 
     private void testReadAccountJson(final String inputFile, final boolean 
allowComments, final StreamReadConstraints streamReadConstraints) throws 
IOException, MalformedRecordException {
@@ -713,6 +712,77 @@ class TestJsonTreeRowRecordReader {
         }
     }
 
+    @Test
+    public void testMultipleInputRecordsWithStartingFieldArray() throws 
IOException, MalformedRecordException {
+        final String inputJson = """
+            [{
+              "books": [{
+                "id": 1,
+                "title": "Book 1"
+              }, {
+                "id": 2,
+                "title": "Book 2"
+              }]
+            }, {
+              "books": [{
+                  "id": 3,
+                  "title": "Book 3"
+              }, {
+                "id": 4,
+                "title": "Book 4"
+              }]
+            }]""";
+
+        final RecordSchema bookSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("title", RecordFieldType.STRING.getDataType())
+        ));
+
+        final List<String> ids = new ArrayList<>();
+        try (final InputStream in = new 
ByteArrayInputStream(inputJson.getBytes(StandardCharsets.UTF_8));
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), bookSchema, dateFormat, 
timeFormat, timestampFormat,
+                 StartingFieldStrategy.NESTED_FIELD, "books", 
SchemaApplicationStrategy.SELECTED_PART, null)) {
+
+            Record record;
+            while ((record = reader.nextRecord()) != null) {
+                final String id = record.getAsString("id");
+                ids.add(id);
+            }
+        }
+
+        assertEquals(List.of("1", "2", "3", "4"), ids);
+    }
+
+    @Test
+    public void testMultipleInputRecordsWithStartingFieldSingleObject() throws 
IOException, MalformedRecordException {
+        final String inputJson = """
+            {"book": {"id": 1,"title": "Book 1"}}
+            {"book": {"id": 2,"title": "Book 2"}}
+            {"book": {"id": 3,"title": "Book 3"}}
+            {"book": {"id": 4,"title": "Book 4"}}
+            """;
+
+        final RecordSchema bookSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("id", RecordFieldType.INT.getDataType()),
+            new RecordField("title", RecordFieldType.STRING.getDataType())
+        ));
+
+        final List<String> ids = new ArrayList<>();
+        try (final InputStream in = new 
ByteArrayInputStream(inputJson.getBytes(StandardCharsets.UTF_8));
+             final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, mock(ComponentLog.class), bookSchema, dateFormat, 
timeFormat, timestampFormat,
+                 StartingFieldStrategy.NESTED_FIELD, "book", 
SchemaApplicationStrategy.SELECTED_PART, null)) {
+
+            Record record;
+            while ((record = reader.nextRecord()) != null) {
+                final String id = record.getAsString("id");
+                ids.add(id);
+            }
+        }
+
+        assertEquals(List.of("1", "2", "3", "4"), ids);
+    }
+
+
 
     @Test
     void testReadUnicodeCharacters() throws IOException, 
MalformedRecordException {
@@ -884,35 +954,35 @@ class TestJsonTreeRowRecordReader {
         //  so we take the first one (INT, BOOLEAN) - as best effort - for 
both cases
         SimpleRecordSchema expectedSelectedRecordSchemaForRecordsInBothArrays 
= expectedChildSchema1;
 
-        List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
-                put("record", new Object[]{
-                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new 
HashMap<String, Object>() {{
-                        put("integer", 11);
-                        put("boolean", true);
-                        put("extraString", "extraStringValue11");
-                    }}),
-                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new 
HashMap<String, Object>() {{
-                        put("integer", 12);
-                        put("boolean", false);
-                        put("extraString", "extraStringValue12");
-                    }})
-                });
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>() {{
-                put("record", new Object[]{
-                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new 
HashMap<String, Object>() {{
-                        put("integer", 21);
-                        put("extraString", "extraStringValue21");
-                        put("string", "stringValue21");
-                    }}),
-                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new 
HashMap<String, Object>() {{
-                        put("integer", 22);
-                        put("extraString", "extraStringValue22");
-                        put("string", "stringValue22");
-                    }})
-                });
-            }})
+        List<Object> expected = List.of(
+            new MapRecord(expectedRecordChoiceSchema, Map.of(
+                "record", new Object[] {
+                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, Map.of(
+                        "integer", 11,
+                        "boolean", true,
+                        "extraString", "extraStringValue11"
+                    )),
+                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, Map.of(
+                        "integer", 12,
+                        "boolean", false,
+                        "extraString", "extraStringValue12"
+                    ))
+                }
+            )),
+            new MapRecord(expectedRecordChoiceSchema, Map.of(
+                "record", new Object[] {
+                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, Map.of(
+                        "integer", 21,
+                        "extraString", "extraStringValue21",
+                        "string", "stringValue21"
+                    )),
+                    new 
MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, Map.of(
+                        "integer", 22,
+                        "extraString", "extraStringValue22",
+                        "string", "stringValue22"
+                    ))
+                }
+            ))
         );
 
         testReadRecords(jsonPath, schema, expected);
@@ -927,15 +997,9 @@ class TestJsonTreeRowRecordReader {
                 new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType())
         ));
 
-        List<Object> expected = Arrays.asList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                    put("id", 42);
-                    put("balance", 4750.89);
-                }}),
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                    put("id", 43);
-                    put("balance", 48212.38);
-                }})
+        List<Object> expected = List.of(
+            new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 
4750.89)),
+            new MapRecord(expectedRecordSchema, Map.of("id", 43, "balance", 
48212.38))
         );
 
         testReadRecords(jsonPath, expected, 
StartingFieldStrategy.NESTED_FIELD, "accounts");
@@ -950,12 +1014,7 @@ class TestJsonTreeRowRecordReader {
                 new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType())
         ));
 
-        List<Object> expected = Collections.singletonList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                    put("id", 42);
-                    put("balance", 4750.89);
-                }})
-        );
+        List<Object> expected = List.of(new MapRecord(expectedRecordSchema, 
Map.of("id", 42, "balance", 4750.89)));
 
         testReadRecords(jsonPath, expected, 
StartingFieldStrategy.NESTED_FIELD, "account");
     }
@@ -969,15 +1028,9 @@ class TestJsonTreeRowRecordReader {
                 new RecordField("type", RecordFieldType.STRING.getDataType())
         ));
 
-        List<Object> expected = Arrays.asList(
-                    new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                        put("id", "n312kj3");
-                        put("type", "employee");
-                    }}),
-                    new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                        put("id", "dl2kdff");
-                        put("type", "security");
-                    }})
+        List<Object> expected = List.of(
+            new MapRecord(expectedRecordSchema, Map.of("id", "n312kj3", 
"type", "employee")),
+            new MapRecord(expectedRecordSchema, Map.of("id", "dl2kdff", 
"type", "security"))
         );
 
         testReadRecords(jsonPath, expected, 
StartingFieldStrategy.NESTED_FIELD, "accountIds");
@@ -1005,20 +1058,14 @@ class TestJsonTreeRowRecordReader {
     void testStartFromNestedFieldThenStartObject() throws IOException, 
MalformedRecordException {
         String jsonPath = 
"src/test/resources/json/nested-array-then-start-object.json";
 
-        SimpleRecordSchema expectedRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+        final SimpleRecordSchema expectedRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
                 new RecordField("id", RecordFieldType.INT.getDataType()),
                 new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType())
         ));
 
-        List<Object> expected = Arrays.asList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                    put("id", 42);
-                    put("balance", 4750.89);
-                }}),
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                    put("id", 43);
-                    put("balance", 48212.38);
-                }})
+        final List<Object> expected = List.of(
+            new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 
4750.89)),
+            new MapRecord(expectedRecordSchema, Map.of("id", 43, "balance", 
48212.38))
         );
 
         testReadRecords(jsonPath, expectedRecordSchema, expected, 
StartingFieldStrategy.NESTED_FIELD,
@@ -1029,22 +1076,19 @@ class TestJsonTreeRowRecordReader {
     void testStartFromNestedObjectWithWholeJsonSchemaScope() throws 
IOException, MalformedRecordException {
         String jsonPath = "src/test/resources/json/single-element-nested.json";
 
-        RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
+        final RecordSchema accountSchema = new 
SimpleRecordSchema(Arrays.asList(
                 new RecordField("id", RecordFieldType.INT.getDataType()),
                 new RecordField("balance", 
RecordFieldType.DOUBLE.getDataType())
         ));
 
-        RecordSchema recordSchema = new 
SimpleRecordSchema(Collections.singletonList(
+        final RecordSchema recordSchema = new 
SimpleRecordSchema(Collections.singletonList(
                 new RecordField("account", 
RecordFieldType.RECORD.getRecordDataType(accountSchema))
         ));
 
-        RecordSchema expectedRecordSchema = accountSchema;
+        final RecordSchema expectedRecordSchema = accountSchema;
 
-        List<Object> expected = Collections.singletonList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                    put("id", 42);
-                    put("balance", 4750.89);
-                }})
+        final List<Object> expected = List.of(
+            new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 
4750.89))
         );
 
         testReadRecords(jsonPath, recordSchema, expected, 
StartingFieldStrategy.NESTED_FIELD,
@@ -1067,14 +1111,8 @@ class TestJsonTreeRowRecordReader {
         RecordSchema expectedRecordSchema = accountSchema;
 
         List<Object> expected = Arrays.asList(
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                    put("id", 42);
-                    put("balance", 4750.89);
-                }}),
-                new MapRecord(expectedRecordSchema, new HashMap<String, 
Object>() {{
-                    put("id", 43);
-                    put("balance", 48212.38);
-                }})
+            new MapRecord(expectedRecordSchema, Map.of("id", 42, "balance", 
4750.89)),
+            new MapRecord(expectedRecordSchema, Map.of("id", 43, "balance", 
48212.38))
         );
 
         testReadRecords(jsonPath, recordSchema, expected, 
StartingFieldStrategy.NESTED_FIELD,
@@ -1264,7 +1302,7 @@ class TestJsonTreeRowRecordReader {
             List<EqualsWrapper<Object>> wrappedExpected = 
EqualsWrapper.wrapList(expected, propertyProviders);
             List<EqualsWrapper<Object>> wrappedActual = 
EqualsWrapper.wrapList(actual, propertyProviders);
 
-            assertEquals(wrappedExpected, wrappedActual);
+            assertEquals(wrappedExpected, wrappedActual, "Expected: " + 
expected + ", Actual: " + actual);
         }
     }
 

Reply via email to