exceptionfactory commented on code in PR #11158:
URL: https://github.com/apache/nifi/pull/11158#discussion_r3105637325


##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java:
##########
@@ -631,4 +632,129 @@ void testChoiceArrayOfStringsOrArrayOfRecords() throws 
IOException {
         final String output = new String(data, StandardCharsets.UTF_8);
         assertEquals(json, output);
     }
+
+    @Test
+    void testReuseInputSerializationDefaultTrueUsesFastPath() throws 
IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("name", "John Doe");
+        values.put("age", 42);
+
+        final String rawForm = "{\"name\":\"John 
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+        final SerializedForm serializedForm = SerializedForm.of(rawForm, 
"application/json");
+        final Record record = new MapRecord(schema, values, serializedForm);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
RecordFieldType.DATE.getDefaultFormat(),
+                RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+            writer.write(RecordSet.of(schema, record));
+        }
+
+        final String output = baos.toString(StandardCharsets.UTF_8);
+        assertTrue(output.contains("\"ignoredExtra\":\"preserved\""),
+                "Default constructor preserves legacy fast-path behavior: raw 
bytes should be emitted verbatim");
+    }
+
+    @Test
+    void testReuseInputSerializationFalseForcesReserialization() throws 
IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("name", "John Doe");
+        values.put("age", 42);
+
+        final String rawForm = "{\"name\":\"John 
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+        final SerializedForm serializedForm = SerializedForm.of(rawForm, 
"application/json");
+        final Record record = new MapRecord(schema, values, serializedForm);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
RecordFieldType.DATE.getDefaultFormat(),
+                RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(),
+                "application/json", false, false)) {
+            writer.write(RecordSet.of(schema, record));
+        }
+
+        final String output = baos.toString(StandardCharsets.UTF_8);
+        assertFalse(output.contains("ignoredExtra"),
+                "When Reuse Input Serialization is false, the writer must 
re-serialize from typed values and ignore raw bytes");
+        assertEquals("[{\"name\":\"John Doe\",\"age\":42}]", output);
+    }
+
+    @Test
+    void testReuseInputSerializationFalseHonorsTimestampFormat() throws 
IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("event", 
RecordFieldType.TIMESTAMP.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Timestamp eventTimestamp = Timestamp.valueOf("2025-03-20 
17:33:11.000");
+        final Map<String, Object> values = new HashMap<>();
+        values.put("event", eventTimestamp);
+
+        final String rawForm = "{\"event\":\"2025-03-20T17:33:11.000+0000\"}";

Review Comment:
   It would be helpful to declare the timestamp string and use String 
formatting here.



##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java:
##########
@@ -631,4 +632,129 @@ void testChoiceArrayOfStringsOrArrayOfRecords() throws 
IOException {
         final String output = new String(data, StandardCharsets.UTF_8);
         assertEquals(json, output);
     }
+
+    @Test
+    void testReuseInputSerializationDefaultTrueUsesFastPath() throws 
IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("name", "John Doe");
+        values.put("age", 42);
+
+        final String rawForm = "{\"name\":\"John 
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+        final SerializedForm serializedForm = SerializedForm.of(rawForm, 
"application/json");
+        final Record record = new MapRecord(schema, values, serializedForm);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
RecordFieldType.DATE.getDefaultFormat(),
+                RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+            writer.write(RecordSet.of(schema, record));
+        }
+
+        final String output = baos.toString(StandardCharsets.UTF_8);
+        assertTrue(output.contains("\"ignoredExtra\":\"preserved\""),
+                "Default constructor preserves legacy fast-path behavior: raw 
bytes should be emitted verbatim");
+    }
+
+    @Test
+    void testReuseInputSerializationFalseForcesReserialization() throws 
IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("name", "John Doe");
+        values.put("age", 42);
+
+        final String rawForm = "{\"name\":\"John 
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+        final SerializedForm serializedForm = SerializedForm.of(rawForm, 
"application/json");
+        final Record record = new MapRecord(schema, values, serializedForm);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
RecordFieldType.DATE.getDefaultFormat(),
+                RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(),
+                "application/json", false, false)) {
+            writer.write(RecordSet.of(schema, record));
+        }
+
+        final String output = baos.toString(StandardCharsets.UTF_8);
+        assertFalse(output.contains("ignoredExtra"),
+                "When Reuse Input Serialization is false, the writer must 
re-serialize from typed values and ignore raw bytes");
+        assertEquals("[{\"name\":\"John Doe\",\"age\":42}]", output);
+    }
+
+    @Test
+    void testReuseInputSerializationFalseHonorsTimestampFormat() throws 
IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("event", 
RecordFieldType.TIMESTAMP.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Timestamp eventTimestamp = Timestamp.valueOf("2025-03-20 
17:33:11.000");
+        final Map<String, Object> values = new HashMap<>();
+        values.put("event", eventTimestamp);
+
+        final String rawForm = "{\"event\":\"2025-03-20T17:33:11.000+0000\"}";
+        final SerializedForm serializedForm = SerializedForm.of(rawForm, 
"application/json");
+        final Record record = new MapRecord(schema, values, serializedForm);
+
+        final ByteArrayOutputStream fastPathBaos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), fastPathBaos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
RecordFieldType.DATE.getDefaultFormat(),
+                RecordFieldType.TIME.getDefaultFormat(), 
"yyyy-MM-dd'T'HH:mm:ss.SSSX",

Review Comment:
   It would be helpful to declare the date format in this method and use it in 
both places.



##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java:
##########
@@ -123,13 +123,28 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
             .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
             .dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_GZIP)
             .build();
+    public static final PropertyDescriptor REUSE_INPUT_SERIALIZATION = new 
PropertyDescriptor.Builder()
+            .name("Reuse Input Serialization")

Review Comment:
   Minor naming, but perhaps `Use Input Serialization` would be better?



##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java:
##########
@@ -631,4 +632,129 @@ void testChoiceArrayOfStringsOrArrayOfRecords() throws 
IOException {
         final String output = new String(data, StandardCharsets.UTF_8);
         assertEquals(json, output);
     }
+
+    @Test
+    void testReuseInputSerializationDefaultTrueUsesFastPath() throws 
IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("name", "John Doe");
+        values.put("age", 42);
+
+        final String rawForm = "{\"name\":\"John 
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+        final SerializedForm serializedForm = SerializedForm.of(rawForm, 
"application/json");
+        final Record record = new MapRecord(schema, values, serializedForm);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false,
+                NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, 
RecordFieldType.DATE.getDefaultFormat(),
+                RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+            writer.write(RecordSet.of(schema, record));
+        }
+
+        final String output = baos.toString(StandardCharsets.UTF_8);
+        assertTrue(output.contains("\"ignoredExtra\":\"preserved\""),
+                "Default constructor preserves legacy fast-path behavior: raw 
bytes should be emitted verbatim");

Review Comment:
   Should the output be equal, so that the strings can be compared, instead of 
checking for `contains()`?



##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java:
##########
@@ -123,13 +123,28 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
             .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
             .dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_GZIP)
             .build();
+    public static final PropertyDescriptor REUSE_INPUT_SERIALIZATION = new 
PropertyDescriptor.Builder()
+            .name("Reuse Input Serialization")
+            .description("""
+                    Controls a throughput optimization that only applies to 
pure JSON pass-through flows. When set to true, and all of the \
+                    following conditions are met, the writer will emit the 
record's original JSON bytes verbatim instead of re-serializing from typed 
field \
+                    values: (1) the upstream reader is JsonTreeReader, (2) no 
data change, (3) the reader's and writer's record schemas are identical, \

Review Comment:
   Would the `JsonPathReader` also work? The details are helpful, but I also 
wonder whether it is necessary to be this detailed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to