Copilot commented on code in PR #2004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2004#discussion_r2273807978
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -58,8 +69,57 @@ void ConsumeMQTT::readProperties(core::ProcessContext&
context) {
receive_maximum_ = gsl::narrow<uint16_t>(utils::parseU64Property(context,
ReceiveMaximum));
}
-void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession&
session) {
- std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
+void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records,
const std::queue<SmartMessage>& msg_queue) const {
+ if (!add_attributes_as_fields_) {
+ return;
+ }
+
+ for (auto& record: new_records) {
Review Comment:
[nitpick] Missing space after colon in range-based for loop. Should be 'for
(auto& record : new_records)' for consistency with C++ style conventions.
```suggestion
for (auto& record : new_records) {
```
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -58,8 +69,57 @@ void ConsumeMQTT::readProperties(core::ProcessContext&
context) {
receive_maximum_ = gsl::narrow<uint16_t>(utils::parseU64Property(context,
ReceiveMaximum));
}
-void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession&
session) {
- std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
+void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records,
const std::queue<SmartMessage>& msg_queue) const {
+ if (!add_attributes_as_fields_) {
+ return;
+ }
+
+ for (auto& record: new_records) {
+ record.emplace("_topic", core::RecordField(msg_queue.front().topic));
+ auto topic_segments = utils::string::split(msg_queue.front().topic, "/");
+ core::RecordArray topic_segments_array;
+ for (size_t i = 0; i < topic_segments.size(); ++i) {
+ topic_segments_array.emplace_back(core::RecordField(topic_segments[i]));
+ }
+ record.emplace("_topicSegments",
core::RecordField(std::move(topic_segments_array)));
+ record.emplace("_qos", core::RecordField(msg_queue.front().contents->qos));
+ record.emplace("_isDuplicate",
core::RecordField(msg_queue.front().contents->dup > 0));
+ record.emplace("_isRetained",
core::RecordField(msg_queue.front().contents->retained > 0));
Review Comment:
This code assumes all records in the RecordSet correspond to the same MQTT
message, but it only uses `msg_queue.front()` for all records. If multiple
records are created from a single MQTT message, this is correct, but the loop
structure suggests this might be processing records from different messages
incorrectly.
```suggestion
void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records,
const SmartMessage& message) const {
if (!add_attributes_as_fields_) {
return;
}
for (auto& record: new_records) {
record.emplace("_topic", core::RecordField(message.topic));
auto topic_segments = utils::string::split(message.topic, "/");
core::RecordArray topic_segments_array;
for (size_t i = 0; i < topic_segments.size(); ++i) {
topic_segments_array.emplace_back(core::RecordField(topic_segments[i]));
}
record.emplace("_topicSegments",
core::RecordField(std::move(topic_segments_array)));
record.emplace("_qos", core::RecordField(message.contents->qos));
record.emplace("_isDuplicate", core::RecordField(message.contents->dup >
0));
record.emplace("_isRetained",
core::RecordField(message.contents->retained > 0));
```
##########
extensions/mqtt/tests/PublishMQTTTests.cpp:
##########
@@ -102,3 +124,46 @@ TEST_CASE_METHOD(Fixture, "PublishMQTT can publish the
number of in-flight messa
CHECK(it->value == 0.0);
}
}
+
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test sending XML message records",
"[publishMQTTTest]") {
+ test_controller_.plan->addController("JsonTreeReader", "JsonTreeReader");
+ auto xml_writer = test_controller_.plan->addController("XMLRecordSetWriter",
"XMLRecordSetWriter");
+ REQUIRE(test_controller_.plan->setProperty(xml_writer,
minifi::standard::XMLRecordSetWriter::NameOfRootTag.name, "root"));
+ REQUIRE(test_controller_.plan->setProperty(xml_writer,
minifi::standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"));
+
+
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::Topic.name,
"mytopic"));
+
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
"127.0.0.1:1883"));
+
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::RecordReader.name,
"JsonTreeReader"));
+
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::RecordWriter.name,
"XMLRecordSetWriter"));
+
+ const auto trigger_results = test_controller_.trigger(R"([{"element1":
"value1"}, {"element2": "42"}])");
+ CHECK(trigger_results.at(TestPublishMQTTProcessor::Success).size() == 2);
+ const auto flow_file_1 =
trigger_results.at(TestPublishMQTTProcessor::Success).at(0);
+
+ auto string_content = test_controller_.plan->getContent(flow_file_1);
+ CHECK(string_content == R"(<?xml
version="1.0"?><root><record><element1>value1</element1></record></root>)");
+
+ const auto flow_file_2 =
trigger_results.at(TestPublishMQTTProcessor::Success).at(1);
+ string_content = test_controller_.plan->getContent(flow_file_2);
+ CHECK(string_content == R"(<?xml
version="1.0"?><root><record><element2>42</element2></record></root>)");
+}
+
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if
non-existant recordset reader or writer is set", "[publishMQTTTest]") {
Review Comment:
Spelling error: 'non-existant' should be 'non-existent'.
```suggestion
TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if
non-existent recordset reader or writer is set", "[publishMQTTTest]") {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]