This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e16286ef40 NIFI-12707 Allow LookupRecord to operate on multiple 
child-records
e16286ef40 is described below

commit e16286ef406f156b6836817261551d78a8447968
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed Jan 31 10:18:47 2024 -0500

    NIFI-12707 Allow LookupRecord to operate on multiple child-records
    
    - In order to accommodate this, also needed to improve DataTypeUtils so 
that it knows that Record A is wider than Record B if Record A contains all 
fields of Record B and more.
    
    - Removed unit tests and resources that are overly complex and no longer 
applicable
    - Fixed issue in unit test based on different line endings between 
operating systems
    
    This closes #8331
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../serialization/record/util/DataTypeUtils.java   |  72 +++--
 .../serialization/record/TestDataTypeUtils.java    | 107 ++++++++
 .../java/org/apache/nifi/util/MockFlowFile.java    |  19 +-
 .../nifi/processors/standard/LookupRecord.java     | 190 ++++++++-----
 .../nifi/processors/standard/TestLookupRecord.java | 297 +++++++++++++++++++++
 .../nifi/json/TestJsonTreeRowRecordReader.java     | 173 +-----------
 .../nifi/yaml/TestYamlTreeRowRecordReader.java     | 144 ----------
 ...-merged-embedded-arrays-and-single-records.json |  40 ---
 ...-merged-embedded-arrays-and-single-records.yaml |  19 --
 9 files changed, 600 insertions(+), 461 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index cebf33428b..000435e410 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1722,6 +1722,10 @@ public class DataTypeUtils {
                 if (otherArrayType.getElementType() == null) {
                     return Optional.of(thisDataType);
                 } else {
+                    final Optional<DataType> widerElementType = 
getWiderType(thisArrayType.getElementType(), otherArrayType.getElementType());
+                    if (widerElementType.isPresent()) {
+                        return 
Optional.of(RecordFieldType.ARRAY.getArrayDataType(widerElementType.get()));
+                    }
                     return Optional.empty();
                 }
             }
@@ -1792,37 +1796,61 @@ public class DataTypeUtils {
                     return Optional.of(thisDataType);
                 }
                 break;
+            case RECORD:
+                if (otherFieldType != RecordFieldType.RECORD)  {
+                    return Optional.empty();
+                }
+
+                final RecordDataType thisRecordDataType = (RecordDataType) 
thisDataType;
+                final RecordDataType otherRecordDataType = (RecordDataType) 
otherDataType;
+                return getWiderRecordType(thisRecordDataType, 
otherRecordDataType);
         }
 
         return Optional.empty();
     }
 
-    private static boolean isDecimalType(final RecordFieldType fieldType) {
-        switch (fieldType) {
-            case FLOAT:
-            case DOUBLE:
-            case DECIMAL:
-                return true;
-            default:
-                return false;
+    private static Optional<DataType> getWiderRecordType(final RecordDataType 
thisRecordDataType, final RecordDataType otherRecordDataType) {
+        final RecordSchema thisSchema = thisRecordDataType.getChildSchema();
+        final RecordSchema otherSchema = otherRecordDataType.getChildSchema();
+
+        if (thisSchema == null && otherSchema != null) {
+            return Optional.of(otherRecordDataType);
+        } else if (thisSchema != null && otherSchema == null) {
+            return Optional.of(thisRecordDataType);
+        } else if (thisSchema == null && otherSchema == null) {
+            return Optional.empty();
+        }
+
+        final Set<RecordField> thisFields = new 
HashSet<>(thisSchema.getFields());
+        final Set<RecordField> otherFields = new 
HashSet<>(otherSchema.getFields());
+
+        if (thisFields.containsAll(otherFields)) {
+            return Optional.of(thisRecordDataType);
+        }
+
+        if (otherFields.containsAll(thisFields)) {
+            return Optional.of(otherRecordDataType);
         }
+
+        return Optional.empty();
+    }
+
+    private static boolean isDecimalType(final RecordFieldType fieldType) {
+        return switch (fieldType) {
+            case FLOAT, DOUBLE, DECIMAL -> true;
+            default -> false;
+        };
     }
 
     private static int getIntegerTypeValue(final RecordFieldType fieldType) {
-        switch (fieldType) {
-            case BIGINT:
-                return 4;
-            case LONG:
-                return 3;
-            case INT:
-                return 2;
-            case SHORT:
-                return 1;
-            case BYTE:
-                return 0;
-            default:
-                return -1;
-        }
+        return switch (fieldType) {
+            case BIGINT -> 4;
+            case LONG -> 3;
+            case INT -> 2;
+            case SHORT -> 1;
+            case BYTE -> 0;
+            default -> -1;
+        };
     }
 
     /**
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index bfe26859b0..6aca3af079 100644
--- 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.serialization.record;
 
 import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
@@ -73,6 +74,112 @@ public class TestDataTypeUtils {
         assertEquals(Optional.of(RecordFieldType.DOUBLE.getDataType()), 
DataTypeUtils.getWiderType(RecordFieldType.DOUBLE.getDataType(), 
RecordFieldType.INT.getDataType()));
     }
 
+    @Test
+    public void testWiderRecordWhenEqual() {
+        final Record smallRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "age", 30), "");
+
+        final Record duplicateRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "age", 30), "");
+
+        final Optional<DataType> widerType = 
DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(smallRecord.getSchema()),
+            
RecordFieldType.RECORD.getRecordDataType(duplicateRecord.getSchema()));
+        assertTrue(widerType.isPresent());
+        assertEquals(((RecordDataType) widerType.get()).getChildSchema(), 
smallRecord.getSchema());
+    }
+
+    @Test
+    public void testWiderRecordWhenAllFieldsContainedWithin() {
+        final Record smallRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "age", 30), "");
+
+        final Record widerRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "fullName", "John Doe",
+                "age", 30), "");
+
+        final Optional<DataType> widerType = 
DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(smallRecord.getSchema()),
+            RecordFieldType.RECORD.getRecordDataType(widerRecord.getSchema()));
+        assertTrue(widerType.isPresent());
+        assertEquals(((RecordDataType) widerType.get()).getChildSchema(), 
widerRecord.getSchema());
+    }
+
+    @Test
+    public void testWiderRecordDifferingFields() {
+        final Record firstRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "address", "123 Main Street",
+                "age", 30), "");
+
+        final Record secondRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "fullName", "John Doe",
+                "age", 30), "");
+
+        final Optional<DataType> widerType = 
DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(firstRecord.getSchema()),
+            
RecordFieldType.RECORD.getRecordDataType(secondRecord.getSchema()));
+        assertFalse(widerType.isPresent());
+    }
+
+    @Test
+    public void testWiderRecordSameFieldNamesConflictingTypes() {
+        final Record firstRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "address", "123 Main Street",
+                "age", 30), "");
+
+        final Record secondRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "address", Map.of(
+                    "street", "123 Main Street",
+                    "city", "Main City",
+                    "state", "MS",
+                    "zip", 12345
+                    ),
+                "age", 30), "");
+
+        final Optional<DataType> widerType = 
DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(firstRecord.getSchema()),
+            
RecordFieldType.RECORD.getRecordDataType(secondRecord.getSchema()));
+        assertFalse(widerType.isPresent());
+    }
+
+    @Test
+    public void testWiderRecordArray() {
+        final Record smallRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "age", 30), "");
+
+        final Record widerRecord = DataTypeUtils.toRecord(Map.of(
+                "firstName", "John",
+                "lastName", "Doe",
+                "fullName", "John Doe",
+                "age", 30), "");
+
+        final DataType smallRecordArray = 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(smallRecord.getSchema()));
+        final DataType widerRecordArray = 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(widerRecord.getSchema()));
+
+        final Optional<DataType> widerType = 
DataTypeUtils.getWiderType(smallRecordArray, widerRecordArray);
+        assertTrue(widerType.isPresent());
+
+        final ArrayDataType widerArrayType = (ArrayDataType) widerType.get();
+        final DataType elementType = widerArrayType.getElementType();
+        assertEquals(RecordFieldType.RECORD, elementType.getFieldType());
+        assertEquals(widerRecord.getSchema(), ((RecordDataType) 
elementType).getChildSchema());
+    }
+
+
     @Test
     public void testConvertRecordMapToJavaMap() {
         assertNull(DataTypeUtils.convertRecordMapToJavaMap(null, null));
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index ec86fe7656..8e5eaf3e14 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -278,16 +278,29 @@ public class MockFlowFile implements FlowFileRecord {
     }
 
     public void assertContentEquals(final String data) {
-        assertContentEquals(data, StandardCharsets.UTF_8);
+        assertContentEquals(data, false);
+    }
+
+    public void assertContentEquals(final String data, final boolean 
ignoreLineEndings) {
+        assertContentEquals(data, StandardCharsets.UTF_8, ignoreLineEndings);
     }
 
     public void assertContentEquals(final String data, final String charset) {
-        assertContentEquals(data, Charset.forName(charset));
+        assertContentEquals(data, Charset.forName(charset), false);
     }
 
+
     public void assertContentEquals(final String data, final Charset charset) {
+        assertContentEquals(data, charset, false);
+    }
+
+    public void assertContentEquals(final String data, final Charset charset, 
final boolean ignoreLineEndings) {
         final String value = new String(this.data, charset);
-        Assertions.assertEquals(data, value);
+        if (ignoreLineEndings) {
+            Assertions.assertEquals(data.replace("\r\n", "\n"), 
value.replace("\r\n", "\n"));
+        } else {
+            Assertions.assertEquals(data, value);
+        }
     }
 
     /**
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 0517f3d800..03f80ad6f0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -82,6 +82,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 
 @SideEffectFree
@@ -116,14 +118,15 @@ public class LookupRecord extends AbstractProcessor {
         "Records will be routed to a 'success' Relationship regardless of 
whether or not there is a match in the configured Lookup Service");
     static final AllowableValue ROUTE_TO_MATCHED_UNMATCHED = new 
AllowableValue("route-to-matched-unmatched", "Route to 'matched' or 
'unmatched'",
         "Records will be routed to either a 'matched' or an 'unmatched' 
Relationship depending on whether or not there was a match in the configured 
Lookup Service. "
-            + "A single input FlowFile may result in two different output 
FlowFiles.");
+            + "A single input FlowFile may result in two different output 
FlowFiles. If the given Record Paths evaluate such that multiple sub-records 
are evaluated, the parent "
+            + "Record will be routed to 'unmatched' unless all sub-records 
match.");
 
     static final AllowableValue RESULT_ENTIRE_RECORD = new 
AllowableValue("insert-entire-record", "Insert Entire Record",
         "The entire Record that is retrieved from the Lookup Service will be 
inserted into the destination path.");
     static final AllowableValue RESULT_RECORD_FIELDS = new 
AllowableValue("record-fields", "Insert Record Fields",
         "All of the fields in the Record that is retrieved from the Lookup 
Service will be inserted into the destination path.");
 
-    static final AllowableValue USE_PROPERTY = new 
AllowableValue("use-property", "Use Property",
+    static final AllowableValue USE_PROPERTY = new 
AllowableValue("use-property", "Use \"Result RecordPath\" Property",
             "The \"Result RecordPath\" property will be used to determine 
which part of the record should be updated with the value returned by the 
Lookup Service");
     static final AllowableValue REPLACE_EXISTING_VALUES = new 
AllowableValue("replace-existing-values", "Replace Existing Values",
             "The \"Result RecordPath\" property will be ignored and the lookup 
service must be a single simple key lookup service. Every dynamic property 
value should "
@@ -155,15 +158,14 @@ public class LookupRecord extends AbstractProcessor {
         .required(true)
         .build();
 
-    static final PropertyDescriptor RESULT_RECORD_PATH = new 
PropertyDescriptor.Builder()
-        .name("result-record-path")
-        .displayName("Result RecordPath")
-        .description("A RecordPath that points to the field whose value should 
be updated with whatever value is returned from the Lookup Service. "
-            + "If not specified, the value that is returned from the Lookup 
Service will be ignored, except for determining whether the FlowFile should "
-            + "be routed to the 'matched' or 'unmatched' Relationship.")
+    static final PropertyDescriptor ROOT_RECORD_PATH = new 
PropertyDescriptor.Builder()
+        .name("Root Record Path")
+        .description("A RecordPath that points to a child Record within each 
of the top-level Records in the FlowFile. If specified, the additional 
RecordPath properties "
+                     + "will be evaluated against this child Record instead of 
the top-level Record. This allows for performing enrichment against multiple 
child Records within a single "
+                     + "top-level Record.")
         .addValidator(new RecordPathValidator())
-        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .required(false)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
     static final PropertyDescriptor RESULT_CONTENTS = new 
PropertyDescriptor.Builder()
@@ -196,6 +198,18 @@ public class LookupRecord extends AbstractProcessor {
         .required(true)
         .build();
 
+    static final PropertyDescriptor RESULT_RECORD_PATH = new 
PropertyDescriptor.Builder()
+        .name("result-record-path")
+        .displayName("Result RecordPath")
+        .description("A RecordPath that points to the field whose value should 
be updated with whatever value is returned from the Lookup Service. "
+                     + "If not specified, the value that is returned from the 
Lookup Service will be ignored, except for determining whether the FlowFile 
should "
+                     + "be routed to the 'matched' or 'unmatched' 
Relationship.")
+        .addValidator(new RecordPathValidator())
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(REPLACEMENT_STRATEGY, USE_PROPERTY)
+        .required(false)
+        .build();
+
     static final PropertyDescriptor CACHE_SIZE = new 
PropertyDescriptor.Builder()
         .name("record-path-lookup-miss-result-cache-size")
         .displayName("Cache Size")
@@ -248,10 +262,11 @@ public class LookupRecord extends AbstractProcessor {
         properties.add(RECORD_READER);
         properties.add(RECORD_WRITER);
         properties.add(LOOKUP_SERVICE);
-        properties.add(RESULT_RECORD_PATH);
+        properties.add(ROOT_RECORD_PATH);
         properties.add(ROUTING_STRATEGY);
         properties.add(RESULT_CONTENTS);
         properties.add(REPLACEMENT_STRATEGY);
+        properties.add(RESULT_RECORD_PATH);
         properties.add(CACHE_SIZE);
         return properties;
     }
@@ -286,9 +301,9 @@ public class LookupRecord extends AbstractProcessor {
 
         final Set<String> requiredKeys = 
validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys();
 
-        
if(validationContext.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue()))
 {
+        if 
(validationContext.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue()))
 {
             // it must be a single key lookup service
-            if(requiredKeys.size() != 1) {
+            if (requiredKeys.size() != 1) {
                 return Collections.singleton(new ValidationResult.Builder()
                         .subject(LOOKUP_SERVICE.getDisplayName())
                         .valid(false)
@@ -360,9 +375,12 @@ public class LookupRecord extends AbstractProcessor {
         final LookupContext lookupContext = createLookupContext(flowFile, 
context, session, writerFactory);
         final ReplacementStrategy replacementStrategy = 
createReplacementStrategy(context);
 
+        final String rootPath = 
context.getProperty(ROOT_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath rootRecordPath = rootPath == null ? null : 
recordPathCache.getCompiled(rootPath);
+
         final RecordSchema enrichedSchema;
         try {
-            enrichedSchema = 
replacementStrategy.determineResultSchema(readerFactory, writerFactory, 
context, session, flowFile, lookupContext);
+            enrichedSchema = 
replacementStrategy.determineResultSchema(readerFactory, rootRecordPath, 
context, session, flowFile, lookupContext);
         } catch (final Exception e) {
             getLogger().error("Could not determine schema to use for enriched 
FlowFiles", e);
             session.transfer(original, REL_FAILURE);
@@ -379,7 +397,15 @@ public class LookupRecord extends AbstractProcessor {
 
                         Record record;
                         while ((record = reader.nextRecord()) != null) {
-                            final Set<Relationship> relationships = 
replacementStrategy.lookup(record, context, lookupContext);
+                            final List<Record> subRecords = 
getSubRecords(record, rootRecordPath);
+                            final Set<MatchResult> matchResults = new 
HashSet<>();
+                            for (final Record subRecord : subRecords) {
+                                final MatchResult matchResult = 
replacementStrategy.lookup(subRecord, context, lookupContext);
+                                matchResults.add(matchResult);
+                            }
+                            record.incorporateInactiveFields();
+
+                            final Set<Relationship> relationships = 
getRelationships(matchResults);
 
                             for (final Relationship relationship : 
relationships) {
                                 // Determine the Write Schema to use for each 
relationship
@@ -460,6 +486,35 @@ public class LookupRecord extends AbstractProcessor {
             flowFile, lookupContext.getRelationshipsUsed().size(), 
replacementStrategy.getLookupCount());
     }
 
+    private List<Record> getSubRecords(final Record record, final RecordPath 
rootRecordPath) {
+        if (rootRecordPath == null) {
+            return List.of(record);
+        } else {
+            // If RecordPath points to an array of Records or any Iterable 
value, flatMap that so that we have all of the Records.
+            // Filter out any non-records, and then return a List of Records.
+            return rootRecordPath.evaluate(record).getSelectedFields()
+                .map(FieldValue::getValue)
+                .flatMap(val -> switch (val) {
+                    case final Object[] recordArray -> 
Arrays.stream(recordArray);
+                    case Iterable<?> iterable -> 
StreamSupport.stream(iterable.spliterator(), false);
+                    case null, default -> Stream.of(val);
+                })
+                .filter(val -> val instanceof Record)
+                .map(Record.class::cast)
+                .toList();
+        }
+    }
+
+    private Set<Relationship> getRelationships(final Set<MatchResult> 
matchResults) {
+        if (matchResults.contains(MatchResult.SOME_MATCH) || 
(matchResults.contains(MatchResult.ALL_MATCH) && 
matchResults.contains(MatchResult.NONE_MATCH))) {
+            return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : 
SUCCESS_COLLECTION;
+        } else if (matchResults.contains(MatchResult.ALL_MATCH)) {
+            return routeToMatchedUnmatched ? MATCHED_COLLECTION : 
SUCCESS_COLLECTION;
+        } else {
+            return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : 
SUCCESS_COLLECTION;
+        }
+    }
+
     private ReplacementStrategy createReplacementStrategy(final ProcessContext 
context) {
         final boolean isInPlaceReplacement = 
context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue());
 
@@ -475,7 +530,7 @@ public class LookupRecord extends AbstractProcessor {
         private int lookupCount = 0;
 
         @Override
-        public Set<Relationship> lookup(final Record record, final 
ProcessContext context, final LookupContext lookupContext) {
+        public MatchResult lookup(final Record record, final ProcessContext 
context, final LookupContext lookupContext) {
             lookupCount++;
 
             final Map<String, RecordPath> recordPaths = 
lookupContext.getRecordPathsByCoordinateKey();
@@ -484,6 +539,7 @@ public class LookupRecord extends AbstractProcessor {
             final FlowFile flowFile = lookupContext.getOriginalFlowFile();
 
             boolean hasUnmatchedValue = false;
+            boolean hasMatchedValue = false;
             for (final Map.Entry<String, RecordPath> entry : 
recordPaths.entrySet()) {
                 final RecordPath recordPath = entry.getValue();
 
@@ -494,7 +550,7 @@ public class LookupRecord extends AbstractProcessor {
                             selectedFieldsCount.incrementAndGet();
                             return fieldVal.getValue() != null;
                         })
-                        .collect(Collectors.toList());
+                        .toList();
 
                 if (selectedFieldsCount.get() == 0) {
                     // When selectedFieldsCount == 0; then an empty array was 
found which counts as a match.
@@ -504,12 +560,13 @@ public class LookupRecord extends AbstractProcessor {
 
                 if (lookupFieldValues.isEmpty()) {
                     final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-                    getLogger().debug("RecordPath for property '{}' did not 
match any fields in a record for {}; routing record to {}", new Object[] 
{coordinateKey, flowFile, rels});
-                    return rels;
+                    getLogger().debug("RecordPath for property '{}' did not 
match any fields in a record for {}; routing record to {}", coordinateKey, 
flowFile, rels);
+                    return MatchResult.NONE_MATCH;
                 }
 
                 for (final FieldValue fieldValue : lookupFieldValues) {
-                    final Object coordinateValue = 
DataTypeUtils.convertType(fieldValue.getValue(), 
fieldValue.getField().getDataType(), null, null, null, 
fieldValue.getField().getFieldName());
+                    final Object coordinateValue = 
DataTypeUtils.convertType(fieldValue.getValue(), 
fieldValue.getField().getDataType(),
+                        Optional.empty(), Optional.empty(), Optional.empty(), 
fieldValue.getField().getFieldName());
 
                     lookupCoordinates.clear();
                     lookupCoordinates.put(coordinateKey, coordinateValue);
@@ -521,9 +578,11 @@ public class LookupRecord extends AbstractProcessor {
                         throw new ProcessException("Failed to lookup 
coordinates " + lookupCoordinates + " in Lookup Service", e);
                     }
 
-                    if (!lookupValueOption.isPresent()) {
+                    if (lookupValueOption.isEmpty()) {
                         hasUnmatchedValue = true;
                         continue;
+                    } else {
+                        hasMatchedValue = true;
                     }
 
                     final Object lookupValue = lookupValueOption.get();
@@ -533,15 +592,17 @@ public class LookupRecord extends AbstractProcessor {
                 }
             }
 
-            if (hasUnmatchedValue) {
-                return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : 
SUCCESS_COLLECTION;
+            if (hasUnmatchedValue && hasMatchedValue) {
+                return MatchResult.SOME_MATCH;
+            } else if (hasUnmatchedValue) {
+                return MatchResult.NONE_MATCH;
             } else {
-                return routeToMatchedUnmatched ? MATCHED_COLLECTION : 
SUCCESS_COLLECTION;
+                return MatchResult.ALL_MATCH;
             }
         }
 
         @Override
-        public RecordSchema determineResultSchema(final RecordReaderFactory 
readerFactory, final RecordSetWriterFactory writerFactory, final ProcessContext 
context, final ProcessSession session,
+        public RecordSchema determineResultSchema(final RecordReaderFactory 
readerFactory, final RecordPath rootRecordPath, final ProcessContext context, 
final ProcessSession session,
                                                   final FlowFile flowFile, 
final LookupContext lookupContext) throws IOException, SchemaNotFoundException, 
MalformedRecordException {
 
             try (final InputStream in = session.read(flowFile);
@@ -564,7 +625,6 @@ public class LookupRecord extends AbstractProcessor {
         private volatile Cache<Map<String, Object>, Optional<?>> cache;
 
         public RecordPathReplacementStrategy(ProcessContext context) {
-
             final int cacheSize = 
context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
 
             if (this.cache == null || cacheSize > 0) {
@@ -575,13 +635,12 @@ public class LookupRecord extends AbstractProcessor {
         }
 
         @Override
-        public Set<Relationship> lookup(final Record record, final 
ProcessContext context, final LookupContext lookupContext) {
+        public MatchResult lookup(final Record record, final ProcessContext 
context, final LookupContext lookupContext) {
             lookupCount++;
 
             final Map<String, Object> lookupCoordinates = 
createLookupCoordinates(record, lookupContext, true);
             if (lookupCoordinates.isEmpty()) {
-                final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-                return rels;
+                return MatchResult.NONE_MATCH;
             }
 
             final FlowFile flowFile = lookupContext.getOriginalFlowFile();
@@ -599,15 +658,13 @@ public class LookupRecord extends AbstractProcessor {
                 throw new ProcessException("Failed to lookup coordinates " + 
lookupCoordinates + " in Lookup Service", e);
             }
 
-            if (!lookupValueOption.isPresent()) {
-                final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-                return rels;
+            if (lookupValueOption.isEmpty()) {
+                return MatchResult.NONE_MATCH;
             }
 
             applyLookupResult(record, context, lookupContext, 
lookupValueOption.get());
 
-            final Set<Relationship> rels = routeToMatchedUnmatched ? 
MATCHED_COLLECTION : SUCCESS_COLLECTION;
-            return rels;
+            return MatchResult.ALL_MATCH;
         }
 
         private void applyLookupResult(final Record record, final 
ProcessContext context, final LookupContext lookupContext, final Object 
lookupValue) {
@@ -628,9 +685,7 @@ public class LookupRecord extends AbstractProcessor {
                     resultPathResult.getSelectedFields().forEach(fieldVal -> {
                         final Object destinationValue = fieldVal.getValue();
 
-                        if (destinationValue instanceof Record) {
-                            final Record destinationRecord = (Record) 
destinationValue;
-
+                        if (destinationValue instanceof final Record 
destinationRecord) {
                             for (final String fieldName : 
lookupRecord.getRawFieldNames()) {
                                 final Object value = 
lookupRecord.getValue(fieldName);
 
@@ -663,7 +718,7 @@ public class LookupRecord extends AbstractProcessor {
         }
 
         @Override
-        public RecordSchema determineResultSchema(final RecordReaderFactory 
readerFactory, final RecordSetWriterFactory writerFactory, final ProcessContext 
context, final ProcessSession session,
+        public RecordSchema determineResultSchema(final RecordReaderFactory 
readerFactory, final RecordPath rootRecordPath, final ProcessContext context, 
final ProcessSession session,
                                                   final FlowFile flowFile, 
final LookupContext lookupContext)
                 throws IOException, SchemaNotFoundException, 
MalformedRecordException, LookupFailureException {
 
@@ -673,22 +728,26 @@ public class LookupRecord extends AbstractProcessor {
 
                 Record record;
                 while ((record = reader.nextRecord()) != null) {
-                    final Map<String, Object> lookupCoordinates = 
createLookupCoordinates(record, lookupContext, false);
-                    if (lookupCoordinates.isEmpty()) {
-                        continue;
-                    }
+                    final List<Record> subRecords = getSubRecords(record, 
rootRecordPath);
+                    for (final Record subRecord : subRecords) {
+                        final Map<String, Object> lookupCoordinates = 
createLookupCoordinates(subRecord, lookupContext, false);
+                        if (lookupCoordinates.isEmpty()) {
+                            continue;
+                        }
 
-                    final Optional<?> lookupResult = 
lookupService.lookup(lookupCoordinates, flowFileAttributes);
+                        final Optional<?> lookupResult = 
lookupService.lookup(lookupCoordinates, flowFileAttributes);
 
-                    cache.put(lookupCoordinates, lookupResult);
+                        cache.put(lookupCoordinates, lookupResult);
 
-                    if (!lookupResult.isPresent()) {
-                        continue;
-                    }
+                        if (lookupResult.isEmpty()) {
+                            continue;
+                        }
 
-                    applyLookupResult(record, context, lookupContext, 
lookupResult.get());
-                    getLogger().debug("Found a Record for {} that returned a 
result from the LookupService. Will provide the following schema to the Writer: 
{}", flowFile, record.getSchema());
-                    return record.getSchema();
+                        applyLookupResult(subRecord, context, lookupContext, 
lookupResult.get());
+                        getLogger().debug("Found a Record for {} that returned 
a result from the LookupService. Will provide the following schema to the 
Writer: {}", flowFile, record.getSchema());
+                        record.incorporateInactiveFields();
+                        return record.getSchema();
+                    }
                 }
 
                 getLogger().debug("Found no Record for {} that returned a 
result from the LookupService. Will provider Reader's schema to the Writer.", 
flowFile);
@@ -708,7 +767,7 @@ public class LookupRecord extends AbstractProcessor {
                 final RecordPathResult pathResult = 
recordPath.evaluate(record);
                 final List<FieldValue> lookupFieldValues = 
pathResult.getSelectedFields()
                     .filter(fieldVal -> fieldVal.getValue() != null)
-                    .collect(Collectors.toList());
+                    .toList();
 
                 if (lookupFieldValues.isEmpty()) {
                     if (logIfNotMatched) {
@@ -729,19 +788,14 @@ public class LookupRecord extends AbstractProcessor {
                     return Collections.emptyMap();
                 }
 
-                final FieldValue fieldValue = lookupFieldValues.get(0);
+                final FieldValue fieldValue = lookupFieldValues.getFirst();
+
+                final RecordField field = fieldValue.getField();
+                final DataType desiredType = field == null ? 
DataTypeUtils.inferDataType(fieldValue.getValue(), 
RecordFieldType.STRING.getDataType()) : field.getDataType();
+                final String fieldName = field == null ? coordinateKey : 
field.getFieldName();
+
                 final Object coordinateValue = DataTypeUtils.convertType(
-                        fieldValue.getValue(),
-                        Optional.ofNullable(fieldValue.getField())
-                                .map(RecordField::getDataType)
-                                
.orElse(DataTypeUtils.inferDataType(fieldValue.getValue(), 
RecordFieldType.STRING.getDataType())),
-                        null,
-                        null,
-                        null,
-                        Optional.ofNullable(fieldValue.getField())
-                                .map(RecordField::getFieldName)
-                                .orElse(coordinateKey)
-                );
+                    fieldValue.getValue(), desiredType, Optional.empty(), 
Optional.empty(), Optional.empty(), fieldName);
                 lookupCoordinates.put(coordinateKey, coordinateValue);
             }
 
@@ -778,17 +832,23 @@ public class LookupRecord extends AbstractProcessor {
         return new LookupContext(recordPaths, resultRecordPath, session, 
flowFile, writerFactory, getLogger());
     }
 
+    enum MatchResult {
+        ALL_MATCH,
+        NONE_MATCH,
+        SOME_MATCH;
+    }
+
     private interface ReplacementStrategy {
-        Set<Relationship> lookup(Record record, ProcessContext context, 
LookupContext lookupContext);
+        MatchResult lookup(Record record, ProcessContext context, 
LookupContext lookupContext);
 
-        RecordSchema determineResultSchema(RecordReaderFactory readerFactory, 
RecordSetWriterFactory writerFactory, ProcessContext context, ProcessSession 
session, FlowFile flowFile,
+        RecordSchema determineResultSchema(RecordReaderFactory readerFactory, 
RecordPath rootRecordPath, ProcessContext context, ProcessSession session, 
FlowFile flowFile,
                                            LookupContext lookupContext) throws 
IOException, SchemaNotFoundException, MalformedRecordException, 
LookupFailureException;
 
         int getLookupCount();
     }
 
 
-    private static class LookupContext {
+    protected static class LookupContext {
         private final Map<String, RecordPath> recordPathsByCoordinateKey;
         private final RecordPath resultRecordPath;
         private final ProcessSession session;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index f52b799383..dda81078e9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -91,6 +91,303 @@ public class TestLookupRecord {
         recordReader.addRecord("Jimmy Doe", 14, null, null);
     }
 
+
+    private void setupForRootElement() throws InitializationException {
+        lookupService.addValue("file1.txt", "text/plain");
+        lookupService.addValue("file2.pdf", "application/pdf");
+        lookupService.addValue("file3.jpg", "image/jpeg");
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", jsonWriter);
+        runner.setProperty(jsonWriter, JsonRecordSetWriter.PRETTY_PRINT_JSON, 
"true");
+        runner.enableControllerService(jsonWriter);
+
+        runner.setProperty(LookupRecord.RECORD_READER, "reader");
+        runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
+        runner.setProperty(LookupRecord.ROOT_RECORD_PATH, "/fileSet/files[*]");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/mimeType");
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, 
LookupRecord.ROUTE_TO_SUCCESS);
+        runner.setProperty("lookup", "/filename");
+    }
+
+    @Test
+    public void testRootRecordSuccessSomeMatches() throws 
InitializationException {
+        setupForRootElement();
+
+        runner.enqueue("""
+            {
+              "fileSet": {
+                "id": "11223344",
+                "source": "external",
+                "files": [{
+                    "filename": "file1.txt",
+                    "size": 4810
+                  }, {
+                    "filename": "file2.pdf",
+                    "size": 47203782
+                  }, {
+                    "filename": "unknown-file.unk",
+                    "size": 278102
+                  }
+                ]
+              }
+            }
+            """.trim());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).getFirst();
+        out.assertContentEquals("""
+            [ {
+              "fileSet" : {
+                "id" : "11223344",
+                "source" : "external",
+                "files" : [ {
+                  "filename" : "file1.txt",
+                  "size" : 4810,
+                  "mimeType" : "text/plain"
+                }, {
+                  "filename" : "file2.pdf",
+                  "size" : 47203782,
+                  "mimeType" : "application/pdf"
+                }, {
+                  "filename" : "unknown-file.unk",
+                  "size" : 278102,
+                  "mimeType" : null
+                } ]
+              }
+            } ]
+            """.trim(), true);
+    }
+
+    @Test
+    public void testRootRecordSuccessAllMatch() throws InitializationException 
{
+        setupForRootElement();
+
+        runner.enqueue("""
+            {
+              "fileSet": {
+                "id": "11223344",
+                "source": "external",
+                "files": [{
+                    "filename": "file1.txt",
+                    "size": 4810
+                  }, {
+                    "filename": "file2.pdf",
+                    "size": 47203782
+                  }
+                ]
+              }
+            }
+            """.trim());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).getFirst();
+        out.assertContentEquals("""
+            [ {
+              "fileSet" : {
+                "id" : "11223344",
+                "source" : "external",
+                "files" : [ {
+                  "filename" : "file1.txt",
+                  "size" : 4810,
+                  "mimeType" : "text/plain"
+                }, {
+                  "filename" : "file2.pdf",
+                  "size" : 47203782,
+                  "mimeType" : "application/pdf"
+                } ]
+              }
+            } ]
+            """.trim(), true);
+    }
+
+    @Test
+    public void testRootRecordSuccessNoneMatch() throws 
InitializationException {
+        setupForRootElement();
+
+        runner.enqueue("""
+            {
+              "fileSet": {
+                "id": "11223344",
+                "source": "external",
+                "files": [{
+                    "filename": "abc.abc",
+                    "size": 4810
+                  }, {
+                    "filename": "xyz.xyz",
+                    "size": 47203782
+                  }
+                ]
+              }
+            }
+            """.trim());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS, 1);
+
+        // The output in this case will not even have a mimeType field because 
we never get a match in order to determine what the
+        // new record schema should look like. As a result, the schema remains 
unchanged and the mimeType field is not added.
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).getFirst();
+        out.assertContentEquals("""
+            [ {
+              "fileSet" : {
+                "id" : "11223344",
+                "source" : "external",
+                "files" : [ {
+                  "filename" : "abc.abc",
+                  "size" : 4810
+                }, {
+                  "filename" : "xyz.xyz",
+                  "size" : 47203782
+                } ]
+              }
+            } ]
+            """.trim(), true);
+    }
+
+    @Test
+    public void testRootRecordRouteToMatchedUnmatchedAllMatch() throws 
InitializationException {
+        setupForRootElement();
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, 
LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
+
+        runner.enqueue("""
+            {
+              "fileSet": {
+                "id": "11223344",
+                "source": "external",
+                "files": [{
+                    "filename": "file1.txt",
+                    "size": 4810
+                  }, {
+                    "filename": "file2.pdf",
+                    "size": 47203782
+                  }
+                ]
+              }
+            }
+            """.trim());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
+
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).getFirst();
+        out.assertContentEquals("""
+            [ {
+              "fileSet" : {
+                "id" : "11223344",
+                "source" : "external",
+                "files" : [ {
+                  "filename" : "file1.txt",
+                  "size" : 4810,
+                  "mimeType" : "text/plain"
+                }, {
+                  "filename" : "file2.pdf",
+                  "size" : 47203782,
+                  "mimeType" : "application/pdf"
+                } ]
+              }
+            } ]
+            """.trim(), true);
+    }
+
+    @Test
+    public void testRootRecordRouteToMatchedUnmatchedSomeMatch() throws 
InitializationException {
+        setupForRootElement();
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, 
LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
+
+        runner.enqueue("""
+            {
+              "fileSet": {
+                "id": "11223344",
+                "source": "external",
+                "files": [{
+                    "filename": "file1.txt",
+                    "size": 4810
+                  }, {
+                    "filename": "xyz.xyz",
+                    "size": 47203782
+                  }
+                ]
+              }
+            }
+            """.trim());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
+
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).getFirst();
+        out.assertContentEquals("""
+            [ {
+              "fileSet" : {
+                "id" : "11223344",
+                "source" : "external",
+                "files" : [ {
+                  "filename" : "file1.txt",
+                  "size" : 4810,
+                  "mimeType" : "text/plain"
+                }, {
+                  "filename" : "xyz.xyz",
+                  "size" : 47203782,
+                  "mimeType" : null
+                } ]
+              }
+            } ]
+            """.trim(), true);
+    }
+
+    @Test
+    public void testRootRecordPointingToArray() throws InitializationException 
{
+        setupForRootElement();
+        runner.setProperty(LookupRecord.ROOT_RECORD_PATH, "/fileSet/files");
+
+        runner.enqueue("""
+            {
+              "fileSet": {
+                "id": "11223344",
+                "source": "external",
+                "files": [{
+                    "filename": "file1.txt",
+                    "size": 4810
+                  }, {
+                    "filename": "file2.pdf",
+                    "size": 47203782
+                  }
+                ]
+              }
+            }
+            """.trim());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).getFirst();
+        out.assertContentEquals("""
+            [ {
+              "fileSet" : {
+                "id" : "11223344",
+                "source" : "external",
+                "files" : [ {
+                  "filename" : "file1.txt",
+                  "size" : 4810,
+                  "mimeType" : "text/plain"
+                }, {
+                  "filename" : "file2.pdf",
+                  "size" : 47203782,
+                  "mimeType" : "application/pdf"
+                } ]
+              }
+            } ]
+            """.trim(), true);
+    }
+
     @Test
     public void testFlowfileAttributesPassed() {
         Map<String, String> attrs = new HashMap<>();
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 2cb95ac050..b48c86d21b 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -772,7 +772,6 @@ class TestJsonTreeRowRecordReader {
 
     @Test
     void testMergeOfSimilarRecords() throws Exception {
-        // GIVEN
         String jsonPath = "src/test/resources/json/similar-records.json";
 
         RecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
@@ -798,14 +797,11 @@ class TestJsonTreeRowRecordReader {
             }})
         );
 
-        // WHEN
-        // THEN
         testReadRecords(jsonPath, expected);
     }
 
     @Test
     void testChoiceOfEmbeddedSimilarRecords() throws Exception {
-        // GIVEN
         String jsonPath = 
"src/test/resources/json/choice-of-embedded-similar-records.json";
 
         SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
@@ -838,165 +834,12 @@ class TestJsonTreeRowRecordReader {
             }})
         );
 
-        // WHEN
-        // THEN
         testReadRecords(jsonPath, expected);
     }
 
-    @Test
-    void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
-        // GIVEN
-        String jsonPath = 
"src/test/resources/json/choice-of-embedded-arrays-and-single-records.json";
-
-        SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Collections.singletonList(
-                new RecordField("integer", RecordFieldType.INT.getDataType())
-        ));
-        SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        SimpleRecordSchema expectedRecordSchema3 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("string", RecordFieldType.STRING.getDataType())
-        ));
-        SimpleRecordSchema expectedRecordSchema4 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("string", RecordFieldType.STRING.getDataType())
-        ));
-        RecordSchema expectedRecordChoiceSchema = new 
SimpleRecordSchema(Collections.singletonList(
-                new RecordField("record", 
RecordFieldType.CHOICE.getChoiceDataType(
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
-                        
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
-                        
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
-                ))
-        ));
-
-        List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<String, Object>(){{
-                    put("integer", 1);
-                }}));
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new Object[]{
-                    new MapRecord(expectedRecordSchema2, new HashMap<String, 
Object>() {{
-                        put("integer", 21);
-                        put("boolean", true);
-                    }}),
-                    new MapRecord(expectedRecordSchema2, new HashMap<String, 
Object>() {{
-                        put("integer", 22);
-                        put("boolean", false);
-                    }})
-                });
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema3, new 
HashMap<String, Object>(){{
-                    put("integer", 3);
-                    put("string", "stringValue3");
-                }}));
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new Object[]{
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 41);
-                        put("string", "stringValue41");
-                    }}),
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 42);
-                        put("string", "stringValue42");
-                    }})
-                });
-            }})
-        );
-
-        // WHEN
-        // THEN
-        testReadRecords(jsonPath, expected);
-    }
-
-    @Test
-    void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
-        // GIVEN
-        String jsonPath = 
"src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json";
-
-        SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        SimpleRecordSchema expectedRecordSchema3 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("string", RecordFieldType.STRING.getDataType())
-        ));
-        SimpleRecordSchema expectedRecordSchema4 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("string", RecordFieldType.STRING.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        RecordSchema expectedRecordChoiceSchema = new 
SimpleRecordSchema(Collections.singletonList(
-                new RecordField("record", 
RecordFieldType.CHOICE.getChoiceDataType(
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
-                        
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
-                        
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
-                ))
-        ));
-
-        List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<String, Object>(){{
-                    put("integer", 1);
-                    put("boolean", false);
-                }}));
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new Object[]{
-                    new MapRecord(expectedRecordSchema2, new HashMap<String, 
Object>() {{
-                        put("integer", 21);
-                        put("boolean", true);
-                    }}),
-                    new MapRecord(expectedRecordSchema2, new HashMap<String, 
Object>() {{
-                        put("integer", 22);
-                        put("boolean", false);
-                    }})
-                });
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new MapRecord(expectedRecordSchema3, new 
HashMap<String, Object>(){{
-                    put("integer", 3);
-                    put("string", "stringValue3");
-                }}));
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, 
Object>(){{
-                put("record", new Object[]{
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 41);
-                        put("string", "stringValue41");
-                    }}),
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 42);
-                        put("string", "stringValue42");
-                    }}),
-                    new MapRecord(expectedRecordSchema4, new HashMap<String, 
Object>() {{
-                        put("integer", 43);
-                        put("boolean", false);
-                    }})
-                });
-            }})
-        );
-
-        // WHEN
-        // THEN
-        testReadRecords(jsonPath, expected);
-    }
 
     @Test
     void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
-        // GIVEN
         String jsonPath = 
"src/test/resources/json/choice-of-different-arrays-with-extra-fields.json";
 
         SimpleRecordSchema recordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
@@ -1072,8 +915,6 @@ class TestJsonTreeRowRecordReader {
             }})
         );
 
-        // WHEN
-        // THEN
         testReadRecords(jsonPath, schema, expected);
     }
 
@@ -1325,12 +1166,10 @@ class TestJsonTreeRowRecordReader {
         }
     }
 
-    private void testReadRecords(String jsonPath, List<Object> expected) 
throws IOException, MalformedRecordException {
-        final File jsonFile = new File(jsonPath);
-        try (
-            InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
-        ) {
-            RecordSchema schema = inferSchema(jsonStream, 
StartingFieldStrategy.ROOT_NODE, null);
+    private void testReadRecords(String jsonFilename, List<Object> expected) 
throws IOException, MalformedRecordException {
+        final File jsonFile = new File(jsonFilename);
+        try (final InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
+            final RecordSchema schema = inferSchema(jsonStream, 
StartingFieldStrategy.ROOT_NODE, null);
             testReadRecords(jsonStream, schema, expected);
         }
     }
@@ -1360,9 +1199,7 @@ class TestJsonTreeRowRecordReader {
                                  List<Object> expected,
                                  StartingFieldStrategy strategy,
                                  String startingFieldName,
-                                 SchemaApplicationStrategy 
schemaApplicationStrategy
-    ) throws IOException, MalformedRecordException {
-
+                                 SchemaApplicationStrategy 
schemaApplicationStrategy) throws IOException, MalformedRecordException {
         final File jsonFile = new File(jsonPath);
         try (InputStream jsonStream = new 
ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
             testReadRecords(jsonStream, schema, expected, strategy, 
startingFieldName, schemaApplicationStrategy);
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
index 26493c4daf..c49b54e19d 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java
@@ -662,150 +662,6 @@ class TestYamlTreeRowRecordReader {
         testReadRecords(yamlPath, expected);
     }
 
-    @Test
-    void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
-        String yamlPath = 
"src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml";
-
-        final SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Collections.singletonList(
-                new RecordField("integer", RecordFieldType.INT.getDataType())
-        ));
-        final SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        final SimpleRecordSchema expectedRecordSchema3 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("string", RecordFieldType.STRING.getDataType())
-        ));
-        final SimpleRecordSchema expectedRecordSchema4 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("string", RecordFieldType.STRING.getDataType())
-        ));
-        RecordSchema expectedRecordChoiceSchema = new 
SimpleRecordSchema(Collections.singletonList(
-                new RecordField("record", 
RecordFieldType.CHOICE.getChoiceDataType(
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
-                        
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
-                        
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
-                ))
-        ));
-
-        List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
-                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<>() {{
-                    put("integer", 1);
-                }}));
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
-                put("record", new Object[]{
-                        new MapRecord(expectedRecordSchema2, new HashMap<>() {{
-                            put("integer", 21);
-                            put("boolean", true);
-                        }}),
-                        new MapRecord(expectedRecordSchema2, new HashMap<>() {{
-                            put("integer", 22);
-                            put("boolean", false);
-                        }})
-                });
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
-                put("record", new MapRecord(expectedRecordSchema3, new 
HashMap<>() {{
-                    put("integer", 3);
-                    put("string", "stringValue3");
-                }}));
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
-                put("record", new Object[]{
-                        new MapRecord(expectedRecordSchema4, new HashMap<>() {{
-                            put("integer", 41);
-                            put("string", "stringValue41");
-                        }}),
-                        new MapRecord(expectedRecordSchema4, new HashMap<>() {{
-                            put("integer", 42);
-                            put("string", "stringValue42");
-                        }})
-                });
-            }})
-        );
-
-        testReadRecords(yamlPath, expected);
-    }
-
-    @Test
-    void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
-        String yamlPath = 
"src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml";
-
-        final SimpleRecordSchema expectedRecordSchema1 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        final SimpleRecordSchema expectedRecordSchema2 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        final SimpleRecordSchema expectedRecordSchema3 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("string", RecordFieldType.STRING.getDataType())
-        ));
-        final SimpleRecordSchema expectedRecordSchema4 = new 
SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType()),
-            new RecordField("string", RecordFieldType.STRING.getDataType()),
-            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
-        ));
-        RecordSchema expectedRecordChoiceSchema = new 
SimpleRecordSchema(Collections.singletonList(
-                new RecordField("record", 
RecordFieldType.CHOICE.getChoiceDataType(
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                        
RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
-                        
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
-                        
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
-                ))
-        ));
-
-        List<Object> expected = Arrays.asList(
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
-                put("record", new MapRecord(expectedRecordSchema1, new 
HashMap<>() {{
-                    put("integer", 1);
-                    put("boolean", false);
-                }}));
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
-                put("record", new Object[]{
-                        new MapRecord(expectedRecordSchema2, new HashMap<>() {{
-                            put("integer", 21);
-                            put("boolean", true);
-                        }}),
-                        new MapRecord(expectedRecordSchema2, new HashMap<>() {{
-                            put("integer", 22);
-                            put("boolean", false);
-                        }})
-                });
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
-                put("record", new MapRecord(expectedRecordSchema3, new 
HashMap<>() {{
-                    put("integer", 3);
-                    put("string", "stringValue3");
-                }}));
-            }}),
-            new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{
-                put("record", new Object[]{
-                        new MapRecord(expectedRecordSchema4, new HashMap<>() {{
-                            put("integer", 41);
-                            put("string", "stringValue41");
-                        }}),
-                        new MapRecord(expectedRecordSchema4, new HashMap<>() {{
-                            put("integer", 42);
-                            put("string", "stringValue42");
-                        }}),
-                        new MapRecord(expectedRecordSchema4, new HashMap<>() {{
-                            put("integer", 43);
-                            put("boolean", false);
-                        }})
-                });
-            }})
-        );
-
-        testReadRecords(yamlPath, expected);
-    }
 
     @Test
     void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json
deleted file mode 100644
index e1529518ae..0000000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json
+++ /dev/null
@@ -1,40 +0,0 @@
-{
-  "dataCollection":[
-    {
-      "record": {
-        "integer": 1,
-        "boolean": false
-      }
-    },
-    {
-      "record": [{
-        "integer": 21,
-        "boolean": true
-      },
-      {
-        "integer": 22,
-        "boolean": false
-      }]
-    },
-    {
-      "record": {
-        "integer": 3,
-        "string": "stringValue3"
-      }
-    },
-    {
-      "record": [{
-        "integer": 41,
-        "string": "stringValue41"
-      },
-      {
-        "integer": 42,
-        "string": "stringValue42"
-      },
-      {
-        "integer": 43,
-        "boolean": false
-      }]
-    }
-  ]
-}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
deleted file mode 100644
index 603d3ddbbb..0000000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml
+++ /dev/null
@@ -1,19 +0,0 @@
-dataCollection:
-  - record:
-      integer: 1
-      boolean: false
-  - record:
-      - integer: 21
-        boolean: true
-      - integer: 22
-        boolean: false
-  - record:
-      integer: 3
-      string: stringValue3
-  - record:
-      - integer: 41
-        string: stringValue41
-      - integer: 42
-        string: stringValue42
-      - integer: 43
-        boolean: false

Reply via email to