This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new e16286ef40 NIFI-12707 Allow LookupRecord to operate on multiple child-records e16286ef40 is described below commit e16286ef406f156b6836817261551d78a8447968 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Wed Jan 31 10:18:47 2024 -0500 NIFI-12707 Allow LookupRecord to operate on multiple child-records - In order to accommodate this, also needed to improve DataTypeUtils so that it knows that Record A is wider than Record B if Record A contains all fields of Record B and more. - Removed unit tests and resources that are overly complex and no longer applicable - Fixed issue in unit test based on different line endings between operating systems This closes #8331 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../serialization/record/util/DataTypeUtils.java | 72 +++-- .../serialization/record/TestDataTypeUtils.java | 107 ++++++++ .../java/org/apache/nifi/util/MockFlowFile.java | 19 +- .../nifi/processors/standard/LookupRecord.java | 190 ++++++++----- .../nifi/processors/standard/TestLookupRecord.java | 297 +++++++++++++++++++++ .../nifi/json/TestJsonTreeRowRecordReader.java | 173 +----------- .../nifi/yaml/TestYamlTreeRowRecordReader.java | 144 ---------- ...-merged-embedded-arrays-and-single-records.json | 40 --- ...-merged-embedded-arrays-and-single-records.yaml | 19 -- 9 files changed, 600 insertions(+), 461 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index cebf33428b..000435e410 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -1722,6 +1722,10 @@ public class DataTypeUtils { if (otherArrayType.getElementType() == null) { return Optional.of(thisDataType); } else { + final Optional<DataType> widerElementType = getWiderType(thisArrayType.getElementType(), otherArrayType.getElementType()); + if (widerElementType.isPresent()) { + return Optional.of(RecordFieldType.ARRAY.getArrayDataType(widerElementType.get())); + } return Optional.empty(); } } @@ -1792,37 +1796,61 @@ public class DataTypeUtils { return Optional.of(thisDataType); } break; + case RECORD: + if (otherFieldType != RecordFieldType.RECORD) { + return Optional.empty(); + } + + final RecordDataType thisRecordDataType = (RecordDataType) thisDataType; + final RecordDataType otherRecordDataType = (RecordDataType) otherDataType; + return getWiderRecordType(thisRecordDataType, otherRecordDataType); } return Optional.empty(); } - private static boolean isDecimalType(final RecordFieldType fieldType) { - switch (fieldType) { - case FLOAT: - case DOUBLE: - case DECIMAL: - return true; - default: - return false; + private static Optional<DataType> getWiderRecordType(final RecordDataType thisRecordDataType, final RecordDataType otherRecordDataType) { + final RecordSchema thisSchema = thisRecordDataType.getChildSchema(); + final RecordSchema otherSchema = otherRecordDataType.getChildSchema(); + + if (thisSchema == null && otherSchema != null) { + return Optional.of(otherRecordDataType); + } else if (thisSchema != null && otherSchema == null) { + return Optional.of(thisRecordDataType); + } else if (thisSchema == null && otherSchema == null) { + return Optional.empty(); + } + + final Set<RecordField> thisFields = new HashSet<>(thisSchema.getFields()); + final Set<RecordField> otherFields = new HashSet<>(otherSchema.getFields()); + + if (thisFields.containsAll(otherFields)) { + return Optional.of(thisRecordDataType); + } + + if (otherFields.containsAll(thisFields)) { + return Optional.of(otherRecordDataType); } + + return Optional.empty(); + } + + private static boolean isDecimalType(final RecordFieldType fieldType) { + return switch (fieldType) { + case FLOAT, DOUBLE, DECIMAL -> true; + default -> false; + }; } private static int getIntegerTypeValue(final RecordFieldType fieldType) { - switch (fieldType) { - case BIGINT: - return 4; - case LONG: - return 3; - case INT: - return 2; - case SHORT: - return 1; - case BYTE: - return 0; - default: - return -1; - } + return switch (fieldType) { + case BIGINT -> 4; + case LONG -> 3; + case INT -> 2; + case SHORT -> 1; + case BYTE -> 0; + default -> -1; + }; } /** diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java index bfe26859b0..6aca3af079 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java @@ -18,6 +18,7 @@ package org.apache.nifi.serialization.record; import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; @@ -73,6 +74,112 @@ public class TestDataTypeUtils { assertEquals(Optional.of(RecordFieldType.DOUBLE.getDataType()), DataTypeUtils.getWiderType(RecordFieldType.DOUBLE.getDataType(), RecordFieldType.INT.getDataType())); } + @Test + public void testWiderRecordWhenEqual() { + final Record smallRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "age", 30), ""); + + final Record duplicateRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "age", 30), ""); + + final Optional<DataType> widerType = DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(smallRecord.getSchema()), + RecordFieldType.RECORD.getRecordDataType(duplicateRecord.getSchema())); + assertTrue(widerType.isPresent()); + assertEquals(((RecordDataType) widerType.get()).getChildSchema(), smallRecord.getSchema()); + } + + @Test + public void testWiderRecordWhenAllFieldsContainedWithin() { + final Record smallRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "age", 30), ""); + + final Record widerRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "fullName", "John Doe", + "age", 30), ""); + + final Optional<DataType> widerType = DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(smallRecord.getSchema()), + RecordFieldType.RECORD.getRecordDataType(widerRecord.getSchema())); + assertTrue(widerType.isPresent()); + assertEquals(((RecordDataType) widerType.get()).getChildSchema(), widerRecord.getSchema()); + } + + @Test + public void testWiderRecordDifferingFields() { + final Record firstRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "address", "123 Main Street", + "age", 30), ""); + + final Record secondRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "fullName", "John Doe", + "age", 30), ""); + + final Optional<DataType> widerType = DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(firstRecord.getSchema()), + RecordFieldType.RECORD.getRecordDataType(secondRecord.getSchema())); + assertFalse(widerType.isPresent()); + } + + @Test + public void testWiderRecordSameFieldNamesConflictingTypes() { + final Record firstRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "address", "123 Main Street", + "age", 30), ""); + + final Record secondRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "address", Map.of( + "street", "123 Main Street", + "city", "Main City", + "state", "MS", + "zip", 12345 + ), + "age", 30), ""); + + final Optional<DataType> widerType = DataTypeUtils.getWiderType(RecordFieldType.RECORD.getRecordDataType(firstRecord.getSchema()), + RecordFieldType.RECORD.getRecordDataType(secondRecord.getSchema())); + assertFalse(widerType.isPresent()); + } + + @Test + public void testWiderRecordArray() { + final Record smallRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "age", 30), ""); + + final Record widerRecord = DataTypeUtils.toRecord(Map.of( + "firstName", "John", + "lastName", "Doe", + "fullName", "John Doe", + "age", 30), ""); + + final DataType smallRecordArray = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(smallRecord.getSchema())); + final DataType widerRecordArray = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(widerRecord.getSchema())); + + final Optional<DataType> widerType = DataTypeUtils.getWiderType(smallRecordArray, widerRecordArray); + assertTrue(widerType.isPresent()); + + final ArrayDataType widerArrayType = (ArrayDataType) widerType.get(); + final DataType elementType = widerArrayType.getElementType(); + assertEquals(RecordFieldType.RECORD, elementType.getFieldType()); + assertEquals(widerRecord.getSchema(), ((RecordDataType) elementType).getChildSchema()); + } + + @Test public void testConvertRecordMapToJavaMap() { assertNull(DataTypeUtils.convertRecordMapToJavaMap(null, null)); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index ec86fe7656..8e5eaf3e14 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -278,16 +278,29 @@ public class MockFlowFile implements FlowFileRecord { } public void assertContentEquals(final String data) { - assertContentEquals(data, StandardCharsets.UTF_8); + assertContentEquals(data, false); + } + + public void assertContentEquals(final String data, final boolean ignoreLineEndings) { + assertContentEquals(data, StandardCharsets.UTF_8, ignoreLineEndings); } public void assertContentEquals(final String data, final String charset) { - assertContentEquals(data, Charset.forName(charset)); + assertContentEquals(data, Charset.forName(charset), false); } + public void assertContentEquals(final String data, final Charset charset) { + assertContentEquals(data, charset, false); + } + + public void assertContentEquals(final String data, final Charset charset, final boolean ignoreLineEndings) { final String value = new String(this.data, charset); - Assertions.assertEquals(data, value); + if (ignoreLineEndings) { + Assertions.assertEquals(data.replace("\r\n", "\n"), value.replace("\r\n", "\n")); + } else { + Assertions.assertEquals(data, value); + } } /** diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 0517f3d800..03f80ad6f0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -82,6 +82,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; @SideEffectFree @@ -116,14 +118,15 @@ public class LookupRecord extends AbstractProcessor { "Records will be routed to a 'success' Relationship regardless of whether or not there is a match in the configured Lookup Service"); static final AllowableValue ROUTE_TO_MATCHED_UNMATCHED = new AllowableValue("route-to-matched-unmatched", "Route to 'matched' or 'unmatched'", "Records will be routed to either a 'matched' or an 'unmatched' Relationship depending on whether or not there was a match in the configured Lookup Service. " - + "A single input FlowFile may result in two different output FlowFiles."); + + "A single input FlowFile may result in two different output FlowFiles. If the given Record Paths evaluate such that multiple sub-records are evaluated, the parent " + + "Record will be routed to 'unmatched' unless all sub-records match."); static final AllowableValue RESULT_ENTIRE_RECORD = new AllowableValue("insert-entire-record", "Insert Entire Record", "The entire Record that is retrieved from the Lookup Service will be inserted into the destination path."); static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields", "Insert Record Fields", "All of the fields in the Record that is retrieved from the Lookup Service will be inserted into the destination path."); - static final AllowableValue USE_PROPERTY = new AllowableValue("use-property", "Use Property", + static final AllowableValue USE_PROPERTY = new AllowableValue("use-property", "Use \"Result RecordPath\" Property", "The \"Result RecordPath\" property will be used to determine which part of the record should be updated with the value returned by the Lookup Service"); static final AllowableValue REPLACE_EXISTING_VALUES = new AllowableValue("replace-existing-values", "Replace Existing Values", "The \"Result RecordPath\" property will be ignored and the lookup service must be a single simple key lookup service. Every dynamic property value should " @@ -155,15 +158,14 @@ public class LookupRecord extends AbstractProcessor { .required(true) .build(); - static final PropertyDescriptor RESULT_RECORD_PATH = new PropertyDescriptor.Builder() - .name("result-record-path") - .displayName("Result RecordPath") - .description("A RecordPath that points to the field whose value should be updated with whatever value is returned from the Lookup Service. " - + "If not specified, the value that is returned from the Lookup Service will be ignored, except for determining whether the FlowFile should " - + "be routed to the 'matched' or 'unmatched' Relationship.") + static final PropertyDescriptor ROOT_RECORD_PATH = new PropertyDescriptor.Builder() + .name("Root Record Path") + .description("A RecordPath that points to a child Record within each of the top-level Records in the FlowFile. If specified, the additional RecordPath properties " + + "will be evaluated against this child Record instead of the top-level Record. This allows for performing enrichment against multiple child Records within a single " + + "top-level Record.") .addValidator(new RecordPathValidator()) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor RESULT_CONTENTS = new PropertyDescriptor.Builder() @@ -196,6 +198,18 @@ public class LookupRecord extends AbstractProcessor { .required(true) .build(); + static final PropertyDescriptor RESULT_RECORD_PATH = new PropertyDescriptor.Builder() + .name("result-record-path") + .displayName("Result RecordPath") + .description("A RecordPath that points to the field whose value should be updated with whatever value is returned from the Lookup Service. " + + "If not specified, the value that is returned from the Lookup Service will be ignored, except for determining whether the FlowFile should " + + "be routed to the 'matched' or 'unmatched' Relationship.") + .addValidator(new RecordPathValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(REPLACEMENT_STRATEGY, USE_PROPERTY) + .required(false) + .build(); + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() .name("record-path-lookup-miss-result-cache-size") .displayName("Cache Size") @@ -248,10 +262,11 @@ public class LookupRecord extends AbstractProcessor { properties.add(RECORD_READER); properties.add(RECORD_WRITER); properties.add(LOOKUP_SERVICE); - properties.add(RESULT_RECORD_PATH); + properties.add(ROOT_RECORD_PATH); properties.add(ROUTING_STRATEGY); properties.add(RESULT_CONTENTS); properties.add(REPLACEMENT_STRATEGY); + properties.add(RESULT_RECORD_PATH); properties.add(CACHE_SIZE); return properties; } @@ -286,9 +301,9 @@ public class LookupRecord extends AbstractProcessor { final Set<String> requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys(); - if(validationContext.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue())) { + if (validationContext.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue())) { // it must be a single key lookup service - if(requiredKeys.size() != 1) { + if (requiredKeys.size() != 1) { return Collections.singleton(new ValidationResult.Builder() .subject(LOOKUP_SERVICE.getDisplayName()) .valid(false) @@ -360,9 +375,12 @@ public class LookupRecord extends AbstractProcessor { final LookupContext lookupContext = createLookupContext(flowFile, context, session, writerFactory); final ReplacementStrategy replacementStrategy = createReplacementStrategy(context); + final String rootPath = context.getProperty(ROOT_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue(); + final RecordPath rootRecordPath = rootPath == null ? null : recordPathCache.getCompiled(rootPath); + final RecordSchema enrichedSchema; try { - enrichedSchema = replacementStrategy.determineResultSchema(readerFactory, writerFactory, context, session, flowFile, lookupContext); + enrichedSchema = replacementStrategy.determineResultSchema(readerFactory, rootRecordPath, context, session, flowFile, lookupContext); } catch (final Exception e) { getLogger().error("Could not determine schema to use for enriched FlowFiles", e); session.transfer(original, REL_FAILURE); @@ -379,7 +397,15 @@ public class LookupRecord extends AbstractProcessor { Record record; while ((record = reader.nextRecord()) != null) { - final Set<Relationship> relationships = replacementStrategy.lookup(record, context, lookupContext); + final List<Record> subRecords = getSubRecords(record, rootRecordPath); + final Set<MatchResult> matchResults = new HashSet<>(); + for (final Record subRecord : subRecords) { + final MatchResult matchResult = replacementStrategy.lookup(subRecord, context, lookupContext); + matchResults.add(matchResult); + } + record.incorporateInactiveFields(); + + final Set<Relationship> relationships = getRelationships(matchResults); for (final Relationship relationship : relationships) { // Determine the Write Schema to use for each relationship @@ -460,6 +486,35 @@ public class LookupRecord extends AbstractProcessor { flowFile, lookupContext.getRelationshipsUsed().size(), replacementStrategy.getLookupCount()); } + private List<Record> getSubRecords(final Record record, final RecordPath rootRecordPath) { + if (rootRecordPath == null) { + return List.of(record); + } else { + // If RecordPath points to an array of Records or any Iterable value, flatMap that so that we have all of the Records. + // Filter out any non-records, and then return a List of Records. + return rootRecordPath.evaluate(record).getSelectedFields() + .map(FieldValue::getValue) + .flatMap(val -> switch (val) { + case final Object[] recordArray -> Arrays.stream(recordArray); + case Iterable<?> iterable -> StreamSupport.stream(iterable.spliterator(), false); + case null, default -> Stream.of(val); + }) + .filter(val -> val instanceof Record) + .map(Record.class::cast) + .toList(); + } + } + + private Set<Relationship> getRelationships(final Set<MatchResult> matchResults) { + if (matchResults.contains(MatchResult.SOME_MATCH) || (matchResults.contains(MatchResult.ALL_MATCH) && matchResults.contains(MatchResult.NONE_MATCH))) { + return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + } else if (matchResults.contains(MatchResult.ALL_MATCH)) { + return routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; + } else { + return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + } + } + private ReplacementStrategy createReplacementStrategy(final ProcessContext context) { final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue()); @@ -475,7 +530,7 @@ public class LookupRecord extends AbstractProcessor { private int lookupCount = 0; @Override - public Set<Relationship> lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) { + public MatchResult lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) { lookupCount++; final Map<String, RecordPath> recordPaths = lookupContext.getRecordPathsByCoordinateKey(); @@ -484,6 +539,7 @@ public class LookupRecord extends AbstractProcessor { final FlowFile flowFile = lookupContext.getOriginalFlowFile(); boolean hasUnmatchedValue = false; + boolean hasMatchedValue = false; for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) { final RecordPath recordPath = entry.getValue(); @@ -494,7 +550,7 @@ public class LookupRecord extends AbstractProcessor { selectedFieldsCount.incrementAndGet(); return fieldVal.getValue() != null; }) - .collect(Collectors.toList()); + .toList(); if (selectedFieldsCount.get() == 0) { // When selectedFieldsCount == 0; then an empty array was found which counts as a match. @@ -504,12 +560,13 @@ public class LookupRecord extends AbstractProcessor { if (lookupFieldValues.isEmpty()) { final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; - getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels}); - return rels; + getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", coordinateKey, flowFile, rels); + return MatchResult.NONE_MATCH; } for (final FieldValue fieldValue : lookupFieldValues) { - final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName()); + final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), + Optional.empty(), Optional.empty(), Optional.empty(), fieldValue.getField().getFieldName()); lookupCoordinates.clear(); lookupCoordinates.put(coordinateKey, coordinateValue); @@ -521,9 +578,11 @@ public class LookupRecord extends AbstractProcessor { throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } - if (!lookupValueOption.isPresent()) { + if (lookupValueOption.isEmpty()) { hasUnmatchedValue = true; continue; + } else { + hasMatchedValue = true; } final Object lookupValue = lookupValueOption.get(); @@ -533,15 +592,17 @@ public class LookupRecord extends AbstractProcessor { } } - if (hasUnmatchedValue) { - return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + if (hasUnmatchedValue && hasMatchedValue) { + return MatchResult.SOME_MATCH; + } else if (hasUnmatchedValue) { + return MatchResult.NONE_MATCH; } else { - return routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; + return MatchResult.ALL_MATCH; } } @Override - public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordSetWriterFactory writerFactory, final ProcessContext context, final ProcessSession session, + public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordPath rootRecordPath, final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final LookupContext lookupContext) throws IOException, SchemaNotFoundException, MalformedRecordException { try (final InputStream in = session.read(flowFile); @@ -564,7 +625,6 @@ public class LookupRecord extends AbstractProcessor { private volatile Cache<Map<String, Object>, Optional<?>> cache; public RecordPathReplacementStrategy(ProcessContext context) { - final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger(); if (this.cache == null || cacheSize > 0) { @@ -575,13 +635,12 @@ public class LookupRecord extends AbstractProcessor { } @Override - public Set<Relationship> lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) { + public MatchResult lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) { lookupCount++; final Map<String, Object> lookupCoordinates = createLookupCoordinates(record, lookupContext, true); if (lookupCoordinates.isEmpty()) { - final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; - return rels; + return MatchResult.NONE_MATCH; } final FlowFile flowFile = lookupContext.getOriginalFlowFile(); @@ -599,15 +658,13 @@ public class LookupRecord extends AbstractProcessor { throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } - if (!lookupValueOption.isPresent()) { - final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; - return rels; + if (lookupValueOption.isEmpty()) { + return MatchResult.NONE_MATCH; } applyLookupResult(record, context, lookupContext, lookupValueOption.get()); - final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; - return rels; + return MatchResult.ALL_MATCH; } private void applyLookupResult(final Record record, final ProcessContext context, final LookupContext lookupContext, final Object lookupValue) { @@ -628,9 +685,7 @@ public class LookupRecord extends AbstractProcessor { resultPathResult.getSelectedFields().forEach(fieldVal -> { final Object destinationValue = fieldVal.getValue(); - if (destinationValue instanceof Record) { - final Record destinationRecord = (Record) destinationValue; - + if (destinationValue instanceof final Record destinationRecord) { for (final String fieldName : lookupRecord.getRawFieldNames()) { final Object value = lookupRecord.getValue(fieldName); @@ -663,7 +718,7 @@ public class LookupRecord extends AbstractProcessor { } @Override - public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordSetWriterFactory writerFactory, final ProcessContext context, final ProcessSession session, + public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordPath rootRecordPath, final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final LookupContext lookupContext) throws IOException, SchemaNotFoundException, MalformedRecordException, LookupFailureException { @@ -673,22 +728,26 @@ public class LookupRecord extends AbstractProcessor { Record record; while ((record = reader.nextRecord()) != null) { - final Map<String, Object> lookupCoordinates = createLookupCoordinates(record, lookupContext, false); - if (lookupCoordinates.isEmpty()) { - continue; - } + final List<Record> subRecords = getSubRecords(record, rootRecordPath); + for (final Record subRecord : subRecords) { + final Map<String, Object> lookupCoordinates = createLookupCoordinates(subRecord, lookupContext, false); + if (lookupCoordinates.isEmpty()) { + continue; + } - final Optional<?> lookupResult = lookupService.lookup(lookupCoordinates, flowFileAttributes); + final Optional<?> lookupResult = lookupService.lookup(lookupCoordinates, flowFileAttributes); - cache.put(lookupCoordinates, lookupResult); + cache.put(lookupCoordinates, lookupResult); - if (!lookupResult.isPresent()) { - continue; - } + if (lookupResult.isEmpty()) { + continue; + } - applyLookupResult(record, context, lookupContext, lookupResult.get()); - getLogger().debug("Found a Record for {} that returned a result from the LookupService. Will provide the following schema to the Writer: {}", flowFile, record.getSchema()); - return record.getSchema(); + applyLookupResult(subRecord, context, lookupContext, lookupResult.get()); + getLogger().debug("Found a Record for {} that returned a result from the LookupService. Will provide the following schema to the Writer: {}", flowFile, record.getSchema()); + record.incorporateInactiveFields(); + return record.getSchema(); + } } getLogger().debug("Found no Record for {} that returned a result from the LookupService. Will provider Reader's schema to the Writer.", flowFile); @@ -708,7 +767,7 @@ public class LookupRecord extends AbstractProcessor { final RecordPathResult pathResult = recordPath.evaluate(record); final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields() .filter(fieldVal -> fieldVal.getValue() != null) - .collect(Collectors.toList()); + .toList(); if (lookupFieldValues.isEmpty()) { if (logIfNotMatched) { @@ -729,19 +788,14 @@ public class LookupRecord extends AbstractProcessor { return Collections.emptyMap(); } - final FieldValue fieldValue = lookupFieldValues.get(0); + final FieldValue fieldValue = lookupFieldValues.getFirst(); + + final RecordField field = fieldValue.getField(); + final DataType desiredType = field == null ? DataTypeUtils.inferDataType(fieldValue.getValue(), RecordFieldType.STRING.getDataType()) : field.getDataType(); + final String fieldName = field == null ? coordinateKey : field.getFieldName(); + final Object coordinateValue = DataTypeUtils.convertType( - fieldValue.getValue(), - Optional.ofNullable(fieldValue.getField()) - .map(RecordField::getDataType) - .orElse(DataTypeUtils.inferDataType(fieldValue.getValue(), RecordFieldType.STRING.getDataType())), - null, - null, - null, - Optional.ofNullable(fieldValue.getField()) - .map(RecordField::getFieldName) - .orElse(coordinateKey) - ); + fieldValue.getValue(), desiredType, Optional.empty(), Optional.empty(), Optional.empty(), fieldName); lookupCoordinates.put(coordinateKey, coordinateValue); } @@ -778,17 +832,23 @@ public class LookupRecord extends AbstractProcessor { return new LookupContext(recordPaths, resultRecordPath, session, flowFile, writerFactory, getLogger()); } + enum MatchResult { + ALL_MATCH, + NONE_MATCH, + SOME_MATCH; + } + private interface ReplacementStrategy { - Set<Relationship> lookup(Record record, ProcessContext context, LookupContext lookupContext); + MatchResult lookup(Record record, ProcessContext context, LookupContext lookupContext); - RecordSchema determineResultSchema(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, ProcessContext context, ProcessSession session, FlowFile flowFile, + RecordSchema determineResultSchema(RecordReaderFactory readerFactory, RecordPath rootRecordPath, ProcessContext context, ProcessSession session, FlowFile flowFile, LookupContext lookupContext) throws IOException, SchemaNotFoundException, MalformedRecordException, LookupFailureException; int getLookupCount(); } - private static class LookupContext { + protected static class LookupContext { private final Map<String, RecordPath> recordPathsByCoordinateKey; private final RecordPath resultRecordPath; private final ProcessSession session; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index f52b799383..dda81078e9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -91,6 +91,303 @@ public class TestLookupRecord { recordReader.addRecord("Jimmy Doe", 14, null, null); } + + private void setupForRootElement() throws InitializationException { + lookupService.addValue("file1.txt", "text/plain"); + lookupService.addValue("file2.pdf", "application/pdf"); + lookupService.addValue("file3.jpg", "image/jpeg"); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); + runner.enableControllerService(jsonWriter); + + runner.setProperty(LookupRecord.RECORD_READER, "reader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "writer"); + runner.setProperty(LookupRecord.ROOT_RECORD_PATH, "/fileSet/files[*]"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/mimeType"); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS); + runner.setProperty("lookup", "/filename"); + } + + @Test + public void testRootRecordSuccessSomeMatches() throws InitializationException { + setupForRootElement(); + + runner.enqueue(""" + { + "fileSet": { + "id": "11223344", + "source": "external", + "files": [{ + "filename": "file1.txt", + "size": 4810 + }, { + "filename": "file2.pdf", + "size": 47203782 + }, { + "filename": "unknown-file.unk", + "size": 278102 + } + ] + } + } + """.trim()); + + runner.run(); + runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS, 1); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).getFirst(); + out.assertContentEquals(""" + [ { + "fileSet" : { + "id" : "11223344", + "source" : "external", + "files" : [ { + "filename" : "file1.txt", + "size" : 4810, + "mimeType" : "text/plain" + }, { + "filename" : "file2.pdf", + "size" : 47203782, + "mimeType" : "application/pdf" + }, { + "filename" : "unknown-file.unk", + "size" : 278102, + "mimeType" : null + } ] + } + } ] + """.trim(), true); + } + + @Test + public void testRootRecordSuccessAllMatch() throws InitializationException { + setupForRootElement(); + + runner.enqueue(""" + { + "fileSet": { + "id": "11223344", + "source": "external", + "files": [{ + "filename": "file1.txt", + "size": 4810 + }, { + "filename": "file2.pdf", + "size": 47203782 + } + ] + } + } + """.trim()); + + runner.run(); + runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS, 1); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).getFirst(); + out.assertContentEquals(""" + [ { + "fileSet" : { + "id" : "11223344", + "source" : "external", + "files" : [ { + "filename" : "file1.txt", + "size" : 4810, + "mimeType" : "text/plain" + }, { + "filename" : "file2.pdf", + "size" : 47203782, + "mimeType" : "application/pdf" + } ] + } + } ] + """.trim(), true); + } + + @Test + public void testRootRecordSuccessNoneMatch() throws InitializationException { + setupForRootElement(); + + runner.enqueue(""" + { + "fileSet": { + "id": "11223344", + "source": "external", + "files": [{ + "filename": "abc.abc", + "size": 4810 + }, { + "filename": "xyz.xyz", + "size": 47203782 + } + ] + } + } + """.trim()); + + runner.run(); + runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS, 1); + + // The output in this case will not even have a mimeType field because we never get a match in order to determine what the + // new record schema should look like. As a result, the schema remains unchanged and the mimeType field is not added. + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).getFirst(); + out.assertContentEquals(""" + [ { + "fileSet" : { + "id" : "11223344", + "source" : "external", + "files" : [ { + "filename" : "abc.abc", + "size" : 4810 + }, { + "filename" : "xyz.xyz", + "size" : 47203782 + } ] + } + } ] + """.trim(), true); + } + + @Test + public void testRootRecordRouteToMatchedUnmatchedAllMatch() throws InitializationException { + setupForRootElement(); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); + + runner.enqueue(""" + { + "fileSet": { + "id": "11223344", + "source": "external", + "files": [{ + "filename": "file1.txt", + "size": 4810 + }, { + "filename": "file2.pdf", + "size": 47203782 + } + ] + } + } + """.trim()); + + runner.run(); + runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).getFirst(); + out.assertContentEquals(""" + [ { + "fileSet" : { + "id" : "11223344", + "source" : "external", + "files" : [ { + "filename" : "file1.txt", + "size" : 4810, + "mimeType" : "text/plain" + }, { + "filename" : "file2.pdf", + "size" : 47203782, + "mimeType" : "application/pdf" + } ] + } + } ] + """.trim(), true); + } + + @Test + public void testRootRecordRouteToMatchedUnmatchedSomeMatch() throws InitializationException { + setupForRootElement(); + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); + + runner.enqueue(""" + { + "fileSet": { + "id": "11223344", + "source": "external", + "files": [{ + "filename": "file1.txt", + "size": 4810 + }, { + "filename": "xyz.xyz", + "size": 47203782 + } + ] + } + } + """.trim()); + + runner.run(); + runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).getFirst(); + out.assertContentEquals(""" + [ { + "fileSet" : { + "id" : "11223344", + "source" : "external", + "files" : [ { + "filename" : "file1.txt", + "size" : 4810, + "mimeType" : "text/plain" + }, { + "filename" : "xyz.xyz", + "size" : 47203782, + "mimeType" : null + } ] + } + } ] + """.trim(), true); + } + + @Test + public void testRootRecordPointingToArray() throws InitializationException { + setupForRootElement(); + runner.setProperty(LookupRecord.ROOT_RECORD_PATH, "/fileSet/files"); + + runner.enqueue(""" + { + "fileSet": { + "id": "11223344", + "source": "external", + "files": [{ + "filename": "file1.txt", + "size": 4810 + }, { + "filename": "file2.pdf", + "size": 47203782 + } + ] + } + } + """.trim()); + + runner.run(); + runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS, 1); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).getFirst(); + out.assertContentEquals(""" + [ { + "fileSet" : { + "id" : "11223344", + "source" : "external", + "files" : [ { + "filename" : "file1.txt", + "size" : 4810, + "mimeType" : "text/plain" + }, { + "filename" : "file2.pdf", + "size" : 47203782, + "mimeType" : "application/pdf" + } ] + } + } ] + """.trim(), true); + } + @Test public void testFlowfileAttributesPassed() { Map<String, String> attrs = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index 2cb95ac050..b48c86d21b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -772,7 +772,6 @@ class TestJsonTreeRowRecordReader { @Test void testMergeOfSimilarRecords() throws Exception { - // GIVEN String jsonPath = "src/test/resources/json/similar-records.json"; RecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList( @@ -798,14 +797,11 @@ class TestJsonTreeRowRecordReader { }}) ); - // WHEN - // THEN testReadRecords(jsonPath, expected); } @Test void testChoiceOfEmbeddedSimilarRecords() throws Exception { - // GIVEN String jsonPath = "src/test/resources/json/choice-of-embedded-similar-records.json"; SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList( @@ -838,165 +834,12 @@ class TestJsonTreeRowRecordReader { }}) ); - // WHEN - // THEN testReadRecords(jsonPath, expected); } - @Test - void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception { - // GIVEN - String jsonPath = "src/test/resources/json/choice-of-embedded-arrays-and-single-records.json"; - - SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Collections.singletonList( - new RecordField("integer", RecordFieldType.INT.getDataType()) - )); - SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) - )); - SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("string", RecordFieldType.STRING.getDataType()) - )); - SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("string", RecordFieldType.STRING.getDataType()) - )); - RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList( - new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( - RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1), - RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3), - RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)), - RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4)) - )) - )); - - List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ - put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{ - put("integer", 1); - }})); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ - put("record", new Object[]{ - new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{ - put("integer", 21); - put("boolean", true); - }}), - new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{ - put("integer", 22); - put("boolean", false); - }}) - }); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ - put("record", new MapRecord(expectedRecordSchema3, new HashMap<String, Object>(){{ - put("integer", 3); - put("string", "stringValue3"); - }})); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ - put("record", new Object[]{ - new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ - put("integer", 41); - put("string", "stringValue41"); - }}), - new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ - put("integer", 42); - put("string", "stringValue42"); - }}) - }); - }}) - ); - - // WHEN - // THEN - testReadRecords(jsonPath, expected); - } - - @Test - void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception { - // GIVEN - String jsonPath = "src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json"; - - SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) - )); - SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) - )); - SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("string", RecordFieldType.STRING.getDataType()) - )); - SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("string", RecordFieldType.STRING.getDataType()), - new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) - )); - RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList( - new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( - RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1), - RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3), - RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)), - RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4)) - )) - )); - - List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ - put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{ - put("integer", 1); - put("boolean", false); - }})); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ - put("record", new Object[]{ - new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{ - put("integer", 21); - put("boolean", true); - }}), - new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{ - put("integer", 22); - put("boolean", false); - }}) - }); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ - put("record", new MapRecord(expectedRecordSchema3, new HashMap<String, Object>(){{ - put("integer", 3); - put("string", "stringValue3"); - }})); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{ - put("record", new Object[]{ - new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ - put("integer", 41); - put("string", "stringValue41"); - }}), - new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ - put("integer", 42); - put("string", "stringValue42"); - }}), - new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{ - put("integer", 43); - put("boolean", false); - }}) - }); - }}) - ); - - // WHEN - // THEN - testReadRecords(jsonPath, expected); - } @Test void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception { - // GIVEN String jsonPath = "src/test/resources/json/choice-of-different-arrays-with-extra-fields.json"; SimpleRecordSchema recordSchema1 = new SimpleRecordSchema(Arrays.asList( @@ -1072,8 +915,6 @@ class TestJsonTreeRowRecordReader { }}) ); - // WHEN - // THEN testReadRecords(jsonPath, schema, expected); } @@ -1325,12 +1166,10 @@ class TestJsonTreeRowRecordReader { } } - private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException { - final File jsonFile = new File(jsonPath); - try ( - InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile)) - ) { - RecordSchema schema = inferSchema(jsonStream, StartingFieldStrategy.ROOT_NODE, null); + private void testReadRecords(String jsonFilename, List<Object> expected) throws IOException, MalformedRecordException { + final File jsonFile = new File(jsonFilename); + try (final InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { + final RecordSchema schema = inferSchema(jsonStream, StartingFieldStrategy.ROOT_NODE, null); testReadRecords(jsonStream, schema, expected); } } @@ -1360,9 +1199,7 @@ class TestJsonTreeRowRecordReader { List<Object> expected, StartingFieldStrategy strategy, String startingFieldName, - SchemaApplicationStrategy schemaApplicationStrategy - ) throws IOException, MalformedRecordException { - + SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException { final File jsonFile = new File(jsonPath); try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, schemaApplicationStrategy); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java index 26493c4daf..c49b54e19d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java @@ -662,150 +662,6 @@ class TestYamlTreeRowRecordReader { testReadRecords(yamlPath, expected); } - @Test - void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception { - String yamlPath = "src/test/resources/yaml/choice-of-embedded-arrays-and-single-records.yaml"; - - final SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Collections.singletonList( - new RecordField("integer", RecordFieldType.INT.getDataType()) - )); - final SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) - )); - final SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("string", RecordFieldType.STRING.getDataType()) - )); - final SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("string", RecordFieldType.STRING.getDataType()) - )); - RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList( - new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( - RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1), - RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3), - RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)), - RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4)) - )) - )); - - List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ - put("record", new MapRecord(expectedRecordSchema1, new HashMap<>() {{ - put("integer", 1); - }})); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ - put("record", new Object[]{ - new MapRecord(expectedRecordSchema2, new HashMap<>() {{ - put("integer", 21); - put("boolean", true); - }}), - new MapRecord(expectedRecordSchema2, new HashMap<>() {{ - put("integer", 22); - put("boolean", false); - }}) - }); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ - put("record", new MapRecord(expectedRecordSchema3, new HashMap<>() {{ - put("integer", 3); - put("string", "stringValue3"); - }})); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ - put("record", new Object[]{ - new MapRecord(expectedRecordSchema4, new HashMap<>() {{ - put("integer", 41); - put("string", "stringValue41"); - }}), - new MapRecord(expectedRecordSchema4, new HashMap<>() {{ - put("integer", 42); - put("string", "stringValue42"); - }}) - }); - }}) - ); - - testReadRecords(yamlPath, expected); - } - - @Test - void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception { - String yamlPath = "src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml"; - - final SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) - )); - final SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) - )); - final SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("string", RecordFieldType.STRING.getDataType()) - )); - final SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList( - new RecordField("integer", RecordFieldType.INT.getDataType()), - new RecordField("string", RecordFieldType.STRING.getDataType()), - new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()) - )); - RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList( - new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType( - RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1), - RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3), - RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)), - RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4)) - )) - )); - - List<Object> expected = Arrays.asList( - new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ - put("record", new MapRecord(expectedRecordSchema1, new HashMap<>() {{ - put("integer", 1); - put("boolean", false); - }})); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ - put("record", new Object[]{ - new MapRecord(expectedRecordSchema2, new HashMap<>() {{ - put("integer", 21); - put("boolean", true); - }}), - new MapRecord(expectedRecordSchema2, new HashMap<>() {{ - put("integer", 22); - put("boolean", false); - }}) - }); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ - put("record", new MapRecord(expectedRecordSchema3, new HashMap<>() {{ - put("integer", 3); - put("string", "stringValue3"); - }})); - }}), - new MapRecord(expectedRecordChoiceSchema, new HashMap<>() {{ - put("record", new Object[]{ - new MapRecord(expectedRecordSchema4, new HashMap<>() {{ - put("integer", 41); - put("string", "stringValue41"); - }}), - new MapRecord(expectedRecordSchema4, new HashMap<>() {{ - put("integer", 42); - put("string", "stringValue42"); - }}), - new MapRecord(expectedRecordSchema4, new HashMap<>() {{ - put("integer", 43); - put("boolean", false); - }}) - }); - }}) - ); - - testReadRecords(yamlPath, expected); - } @Test void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json deleted file mode 100644 index e1529518ae..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "dataCollection":[ - { - "record": { - "integer": 1, - "boolean": false - } - }, - { - "record": [{ - "integer": 21, - "boolean": true - }, - { - "integer": 22, - "boolean": false - }] - }, - { - "record": { - "integer": 3, - "string": "stringValue3" - } - }, - { - "record": [{ - "integer": 41, - "string": "stringValue41" - }, - { - "integer": 42, - "string": "stringValue42" - }, - { - "integer": 43, - "boolean": false - }] - } - ] -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml deleted file mode 100644 index 603d3ddbbb..0000000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/yaml/choice-of-merged-embedded-arrays-and-single-records.yaml +++ /dev/null @@ -1,19 +0,0 @@ -dataCollection: - - record: - integer: 1 - boolean: false - - record: - - integer: 21 - boolean: true - - integer: 22 - boolean: false - - record: - integer: 3 - string: stringValue3 - - record: - - integer: 41 - string: stringValue41 - - integer: 42 - string: stringValue42 - - integer: 43 - boolean: false