This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c68f6910ba NIFI-13843 Default to Drop Unknown Fields in Record Readers
(#9347)
c68f6910ba is described below
commit c68f6910baed5167ecc85cd78f75f9696de42df5
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Oct 29 20:18:47 2024 +0100
NIFI-13843 Default to Drop Unknown Fields in Record Readers (#9347)
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/serialization/RecordReader.java | 2 +-
.../nifi/serialization/record/MapRecord.java | 4 ---
.../apache/nifi/json/JsonTreeRowRecordReader.java | 19 ++++++++++++-
.../nifi-standard-processors/pom.xml | 1 +
.../processors/standard/TestConvertRecord.java | 33 ++++++++++++++++++++++
.../TestConvertRecord/input/person_dropfield.json | 9 ++++++
6 files changed, 62 insertions(+), 6 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
index 1346e349b8..5412c73d23 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
@@ -47,7 +47,7 @@ public interface RecordReader extends Closeable {
* @throws SchemaValidationException if a Record contains a field that
violates the schema and cannot be coerced into the appropriate field type.
*/
default Record nextRecord() throws IOException, MalformedRecordException {
- return nextRecord(true, false);
+ return nextRecord(true, true);
}
/**
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 359d574990..d04ac71d95 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -558,10 +558,6 @@ public class MapRecord implements Record {
private Optional<RecordField> setValueAndGetField(final String fieldName,
final Object value) {
final Optional<RecordField> field = getSchema().getField(fieldName);
if (field.isEmpty()) {
- if (dropUnknownFields) {
- return field;
- }
-
updateValue(fieldName, value);
return field;
}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index a8dc6c4e3a..12bc6b0878 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.nifi.json;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
+
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -147,8 +148,22 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
final boolean coerceTypes, final
boolean dropUnknown) throws IOException, MalformedRecordException {
final Map<String, Object> values = new
LinkedHashMap<>(schema.getFieldCount() * 2);
+ final JsonNode jsonNodeForSerialization;
if (dropUnknown) {
+ jsonNodeForSerialization = jsonNode.deepCopy();
+
+ // Delete unknown fields for updated serialized representation
+ final Iterator<Map.Entry<String, JsonNode>> fields =
jsonNodeForSerialization.fields();
+ while (fields.hasNext()) {
+ final Map.Entry<String, JsonNode> field = fields.next();
+ final String fieldName = field.getKey();
+ final Optional<RecordField> recordField =
schema.getField(fieldName);
+ if (recordField.isEmpty()) {
+ fields.remove();
+ }
+ }
+
for (final RecordField recordField : schema.getFields()) {
final JsonNode childNode = getChildNode(jsonNode, recordField);
if (childNode == null) {
@@ -169,6 +184,8 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
values.put(fieldName, value);
}
} else {
+ jsonNodeForSerialization = jsonNode;
+
final Iterator<String> fieldNames = jsonNode.fieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
@@ -189,7 +206,7 @@ public class JsonTreeRowRecordReader extends
AbstractJsonRowRecordReader {
}
}
- final Supplier<String> supplier = jsonNode::toString;
+ final Supplier<String> supplier = jsonNodeForSerialization::toString;
return new MapRecord(schema, values, SerializedForm.of(supplier,
"application/json"), false, dropUnknown);
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 8cea933f2e..e0cdb78743 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -625,6 +625,7 @@
<exclude>src/test/resources/TestConvertRecord/input/person_bad_enum.json</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person_long_id.json</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
+
<exclude>src/test/resources/TestConvertRecord/input/person_dropfield.json</exclude>
<exclude>src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
<exclude>src/test/resources/TestCountText/jabberwocky.txt</exclude>
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 3d7f6b8030..897effa236 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -51,6 +51,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Pretty-printing is not
portable across operating systems")
@@ -404,4 +405,36 @@ public class TestConvertRecord {
}
}
}
+
+ @Test
+ public void testJSONDroppingUnkownFields() throws InitializationException,
IOException {
+ final TestRunner runner =
TestRunners.newTestRunner(ConvertRecord.class);
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+
+ final String inputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
+ final String outputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
+
+ runner.setProperty(jsonReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT,
inputSchemaText);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("writer", jsonWriter);
+ runner.setProperty(jsonWriter,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT,
outputSchemaText);
+ runner.setProperty(jsonWriter, "Schema Write Strategy",
"full-schema-attribute");
+ runner.enableControllerService(jsonWriter);
+
+
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_dropfield.json"));
+
+ runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+ runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
+ assertFalse(new
String(flowFile.toByteArray()).contains("fieldThatShouldBeRemoved"));
+ }
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json
new file mode 100644
index 0000000000..50f955f5d2
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json
@@ -0,0 +1,9 @@
+[ {
+ "id" : 485,
+ "name" : {
+ "last" : "Doe",
+ "first" : "John"
+ },
+ "status" : "ACTIVE",
+ "fieldThatShouldBeRemoved": "Test"
+} ]
\ No newline at end of file