This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new eecb6bfb38 NIFI-10650 Fix demarcator is appended to the end of the FlowFile's content by ConsumeMQTT eecb6bfb38 is described below commit eecb6bfb3842c72727dd65a463b396d567a06e50 Author: Nandor Soma Abonyi <abonyis...@gmail.com> AuthorDate: Fri Oct 14 17:19:24 2022 +0200 NIFI-10650 Fix demarcator is appended to the end of the FlowFile's content by ConsumeMQTT This closes #6534. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java | 4 +++- .../test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index 849cb17324..c3c299905f 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -431,8 +431,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { int i = 0; while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) { final ReceivedMqttMessage mqttMessage = mqttQueue.poll(); + if (i > 0) { + out.write(demarcator); + } out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()); - out.write(demarcator); session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false); i++; } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java index af830f93d2..9a004af7e2 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java @@ -516,7 +516,7 @@ public class TestConsumeMQTT { assertEquals(flowFiles.size(), 1); assertEquals("{\"name\":\"Apache NiFi\"}\\n" + THIS_IS_NOT_JSON + "\\n" - + "{\"name\":\"Apache NiFi\"}\\n", + + "{\"name\":\"Apache NiFi\"}", new String(flowFiles.get(0).toByteArray())); final List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);