exceptionfactory commented on code in PR #11158:
URL: https://github.com/apache/nifi/pull/11158#discussion_r3105637325
##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java:
##########
@@ -631,4 +632,129 @@ void testChoiceArrayOfStringsOrArrayOfRecords() throws
IOException {
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(json, output);
}
+
+ @Test
+ void testReuseInputSerializationDefaultTrueUsesFastPath() throws
IOException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("name", "John Doe");
+ values.put("age", 42);
+
+ final String rawForm = "{\"name\":\"John
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+ final SerializedForm serializedForm = SerializedForm.of(rawForm,
"application/json");
+ final Record record = new MapRecord(schema, values, serializedForm);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final WriteJsonResult writer = new
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new
SchemaNameAsAttribute(), baos, false,
+ NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY,
RecordFieldType.DATE.getDefaultFormat(),
+ RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+ writer.write(RecordSet.of(schema, record));
+ }
+
+ final String output = baos.toString(StandardCharsets.UTF_8);
+ assertTrue(output.contains("\"ignoredExtra\":\"preserved\""),
+ "Default constructor preserves legacy fast-path behavior: raw
bytes should be emitted verbatim");
+ }
+
+ @Test
+ void testReuseInputSerializationFalseForcesReserialization() throws
IOException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("name", "John Doe");
+ values.put("age", 42);
+
+ final String rawForm = "{\"name\":\"John
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+ final SerializedForm serializedForm = SerializedForm.of(rawForm,
"application/json");
+ final Record record = new MapRecord(schema, values, serializedForm);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final WriteJsonResult writer = new
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new
SchemaNameAsAttribute(), baos, false,
+ NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY,
RecordFieldType.DATE.getDefaultFormat(),
+ RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(),
+ "application/json", false, false)) {
+ writer.write(RecordSet.of(schema, record));
+ }
+
+ final String output = baos.toString(StandardCharsets.UTF_8);
+ assertFalse(output.contains("ignoredExtra"),
+ "When Reuse Input Serialization is false, the writer must
re-serialize from typed values and ignore raw bytes");
+ assertEquals("[{\"name\":\"John Doe\",\"age\":42}]", output);
+ }
+
+ @Test
+ void testReuseInputSerializationFalseHonorsTimestampFormat() throws
IOException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("event",
RecordFieldType.TIMESTAMP.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Timestamp eventTimestamp = Timestamp.valueOf("2025-03-20
17:33:11.000");
+ final Map<String, Object> values = new HashMap<>();
+ values.put("event", eventTimestamp);
+
+ final String rawForm = "{\"event\":\"2025-03-20T17:33:11.000+0000\"}";
Review Comment:
It would be helpful to declare the timestamp string and use String
formatting here.
##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java:
##########
@@ -631,4 +632,129 @@ void testChoiceArrayOfStringsOrArrayOfRecords() throws
IOException {
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(json, output);
}
+
+ @Test
+ void testReuseInputSerializationDefaultTrueUsesFastPath() throws
IOException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("name", "John Doe");
+ values.put("age", 42);
+
+ final String rawForm = "{\"name\":\"John
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+ final SerializedForm serializedForm = SerializedForm.of(rawForm,
"application/json");
+ final Record record = new MapRecord(schema, values, serializedForm);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final WriteJsonResult writer = new
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new
SchemaNameAsAttribute(), baos, false,
+ NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY,
RecordFieldType.DATE.getDefaultFormat(),
+ RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+ writer.write(RecordSet.of(schema, record));
+ }
+
+ final String output = baos.toString(StandardCharsets.UTF_8);
+ assertTrue(output.contains("\"ignoredExtra\":\"preserved\""),
+ "Default constructor preserves legacy fast-path behavior: raw
bytes should be emitted verbatim");
+ }
+
+ @Test
+ void testReuseInputSerializationFalseForcesReserialization() throws
IOException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("name", "John Doe");
+ values.put("age", 42);
+
+ final String rawForm = "{\"name\":\"John
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+ final SerializedForm serializedForm = SerializedForm.of(rawForm,
"application/json");
+ final Record record = new MapRecord(schema, values, serializedForm);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final WriteJsonResult writer = new
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new
SchemaNameAsAttribute(), baos, false,
+ NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY,
RecordFieldType.DATE.getDefaultFormat(),
+ RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(),
+ "application/json", false, false)) {
+ writer.write(RecordSet.of(schema, record));
+ }
+
+ final String output = baos.toString(StandardCharsets.UTF_8);
+ assertFalse(output.contains("ignoredExtra"),
+ "When Reuse Input Serialization is false, the writer must
re-serialize from typed values and ignore raw bytes");
+ assertEquals("[{\"name\":\"John Doe\",\"age\":42}]", output);
+ }
+
+ @Test
+ void testReuseInputSerializationFalseHonorsTimestampFormat() throws
IOException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("event",
RecordFieldType.TIMESTAMP.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Timestamp eventTimestamp = Timestamp.valueOf("2025-03-20
17:33:11.000");
+ final Map<String, Object> values = new HashMap<>();
+ values.put("event", eventTimestamp);
+
+ final String rawForm = "{\"event\":\"2025-03-20T17:33:11.000+0000\"}";
+ final SerializedForm serializedForm = SerializedForm.of(rawForm,
"application/json");
+ final Record record = new MapRecord(schema, values, serializedForm);
+
+ final ByteArrayOutputStream fastPathBaos = new ByteArrayOutputStream();
+ try (final WriteJsonResult writer = new
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new
SchemaNameAsAttribute(), fastPathBaos, false,
+ NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY,
RecordFieldType.DATE.getDefaultFormat(),
+ RecordFieldType.TIME.getDefaultFormat(),
"yyyy-MM-dd'T'HH:mm:ss.SSSX",
Review Comment:
It would be helpful to declare the date format in this method and use it in
both places.
##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java:
##########
@@ -123,13 +123,28 @@ public class JsonRecordSetWriter extends
DateTimeTextRecordSetWriter implements
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_GZIP)
.build();
+ public static final PropertyDescriptor REUSE_INPUT_SERIALIZATION = new
PropertyDescriptor.Builder()
+ .name("Reuse Input Serialization")
Review Comment:
Minor naming, but perhaps `Use Input Serialization` would be better?
##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java:
##########
@@ -631,4 +632,129 @@ void testChoiceArrayOfStringsOrArrayOfRecords() throws
IOException {
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(json, output);
}
+
+ @Test
+ void testReuseInputSerializationDefaultTrueUsesFastPath() throws
IOException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("name", "John Doe");
+ values.put("age", 42);
+
+ final String rawForm = "{\"name\":\"John
Doe\",\"age\":42,\"ignoredExtra\":\"preserved\"}";
+ final SerializedForm serializedForm = SerializedForm.of(rawForm,
"application/json");
+ final Record record = new MapRecord(schema, values, serializedForm);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final WriteJsonResult writer = new
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new
SchemaNameAsAttribute(), baos, false,
+ NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY,
RecordFieldType.DATE.getDefaultFormat(),
+ RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+ writer.write(RecordSet.of(schema, record));
+ }
+
+ final String output = baos.toString(StandardCharsets.UTF_8);
+ assertTrue(output.contains("\"ignoredExtra\":\"preserved\""),
+ "Default constructor preserves legacy fast-path behavior: raw
bytes should be emitted verbatim");
Review Comment:
Should the output be equal, so that the strings can be compared, instead of
checking for `contains()`?
##########
nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java:
##########
@@ -123,13 +123,28 @@ public class JsonRecordSetWriter extends
DateTimeTextRecordSetWriter implements
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_GZIP)
.build();
+ public static final PropertyDescriptor REUSE_INPUT_SERIALIZATION = new
PropertyDescriptor.Builder()
+ .name("Reuse Input Serialization")
+ .description("""
+ Controls a throughput optimization that only applies to
pure JSON pass-through flows. When set to true, and all of the \
+ following conditions are met, the writer will emit the
record's original JSON bytes verbatim instead of re-serializing from typed
field \
+ values: (1) the upstream reader is JsonTreeReader, (2) no
data change, (3) the reader's and writer's record schemas are identical, \
Review Comment:
Would the `JsonPathReader` also work? The details are helpful, but I also
wonder whether it is necessary to be this detailed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]