This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c68f6910ba NIFI-13843 Default to Drop Unknown Fields in Record Readers 
(#9347)
c68f6910ba is described below

commit c68f6910baed5167ecc85cd78f75f9696de42df5
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Oct 29 20:18:47 2024 +0100

    NIFI-13843 Default to Drop Unknown Fields in Record Readers (#9347)
    
    Co-authored-by: David Handermann <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/serialization/RecordReader.java    |  2 +-
 .../nifi/serialization/record/MapRecord.java       |  4 ---
 .../apache/nifi/json/JsonTreeRowRecordReader.java  | 19 ++++++++++++-
 .../nifi-standard-processors/pom.xml               |  1 +
 .../processors/standard/TestConvertRecord.java     | 33 ++++++++++++++++++++++
 .../TestConvertRecord/input/person_dropfield.json  |  9 ++++++
 6 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
index 1346e349b8..5412c73d23 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordReader.java
@@ -47,7 +47,7 @@ public interface RecordReader extends Closeable {
      * @throws SchemaValidationException if a Record contains a field that 
violates the schema and cannot be coerced into the appropriate field type.
      */
     default Record nextRecord() throws IOException, MalformedRecordException {
-        return nextRecord(true, false);
+        return nextRecord(true, true);
     }
 
     /**
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 359d574990..d04ac71d95 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -558,10 +558,6 @@ public class MapRecord implements Record {
     private Optional<RecordField> setValueAndGetField(final String fieldName, 
final Object value) {
         final Optional<RecordField> field = getSchema().getField(fieldName);
         if (field.isEmpty()) {
-            if (dropUnknownFields) {
-                return field;
-            }
-
             updateValue(fieldName, value);
             return field;
         }
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index a8dc6c4e3a..12bc6b0878 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.nifi.json;
 import com.fasterxml.jackson.core.StreamReadConstraints;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -147,8 +148,22 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                                            final boolean coerceTypes, final 
boolean dropUnknown) throws IOException, MalformedRecordException {
 
         final Map<String, Object> values = new 
LinkedHashMap<>(schema.getFieldCount() * 2);
+        final JsonNode jsonNodeForSerialization;
 
         if (dropUnknown) {
+            jsonNodeForSerialization = jsonNode.deepCopy();
+
+            // Delete unknown fields for updated serialized representation
+            final Iterator<Map.Entry<String, JsonNode>> fields = 
jsonNodeForSerialization.fields();
+            while (fields.hasNext()) {
+                final Map.Entry<String, JsonNode> field = fields.next();
+                final String fieldName = field.getKey();
+                final Optional<RecordField> recordField = 
schema.getField(fieldName);
+                if (recordField.isEmpty()) {
+                    fields.remove();
+                }
+            }
+
             for (final RecordField recordField : schema.getFields()) {
                 final JsonNode childNode = getChildNode(jsonNode, recordField);
                 if (childNode == null) {
@@ -169,6 +184,8 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
                 values.put(fieldName, value);
             }
         } else {
+            jsonNodeForSerialization = jsonNode;
+
             final Iterator<String> fieldNames = jsonNode.fieldNames();
             while (fieldNames.hasNext()) {
                 final String fieldName = fieldNames.next();
@@ -189,7 +206,7 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
             }
         }
 
-        final Supplier<String> supplier = jsonNode::toString;
+        final Supplier<String> supplier = jsonNodeForSerialization::toString;
         return new MapRecord(schema, values, SerializedForm.of(supplier, 
"application/json"), false, dropUnknown);
     }
 
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 8cea933f2e..e0cdb78743 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -625,6 +625,7 @@
                         
<exclude>src/test/resources/TestConvertRecord/input/person_bad_enum.json</exclude>
                         
<exclude>src/test/resources/TestConvertRecord/input/person_long_id.json</exclude>
                         
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
+                        
<exclude>src/test/resources/TestConvertRecord/input/person_dropfield.json</exclude>
                         
<exclude>src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc</exclude>
                         
<exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
                         
<exclude>src/test/resources/TestCountText/jabberwocky.txt</exclude>
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 3d7f6b8030..897effa236 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -51,6 +51,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @DisabledOnOs(value = OS.WINDOWS, disabledReason = "Pretty-printing is not 
portable across operating systems")
@@ -404,4 +405,36 @@ public class TestConvertRecord {
             }
         }
     }
+
+    @Test
+    public void testJSONDroppingUnkownFields() throws InitializationException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertRecord.class);
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+
+        final String inputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
+        final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
+
+        runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, 
inputSchemaText);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", jsonWriter);
+        runner.setProperty(jsonWriter, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, 
outputSchemaText);
+        runner.setProperty(jsonWriter, "Schema Write Strategy", 
"full-schema-attribute");
+        runner.enableControllerService(jsonWriter);
+
+        
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_dropfield.json"));
+
+        runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+        runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
+        assertFalse(new 
String(flowFile.toByteArray()).contains("fieldThatShouldBeRemoved"));
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json
new file mode 100644
index 0000000000..50f955f5d2
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person_dropfield.json
@@ -0,0 +1,9 @@
+[ {
+  "id" : 485,
+  "name" : {
+    "last" : "Doe",
+    "first" : "John"
+  },
+  "status" : "ACTIVE",
+  "fieldThatShouldBeRemoved": "Test"
+} ]
\ No newline at end of file

Reply via email to