Copilot commented on code in PR #2004:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2004#discussion_r2273807978


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -58,8 +69,57 @@ void ConsumeMQTT::readProperties(core::ProcessContext& 
context) {
   receive_maximum_ = gsl::narrow<uint16_t>(utils::parseU64Property(context, 
ReceiveMaximum));
 }
 
-void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& 
session) {
-  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
+void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, 
const std::queue<SmartMessage>& msg_queue) const {
+  if (!add_attributes_as_fields_) {
+    return;
+  }
+
+  for (auto& record: new_records) {

Review Comment:
   [nitpick] Missing space after colon in range-based for loop. Should be 'for 
(auto& record : new_records)' for consistency with C++ style conventions.
   ```suggestion
     for (auto& record : new_records) {
   ```



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -58,8 +69,57 @@ void ConsumeMQTT::readProperties(core::ProcessContext& 
context) {
   receive_maximum_ = gsl::narrow<uint16_t>(utils::parseU64Property(context, 
ReceiveMaximum));
 }
 
-void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& 
session) {
-  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
+void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, 
const std::queue<SmartMessage>& msg_queue) const {
+  if (!add_attributes_as_fields_) {
+    return;
+  }
+
+  for (auto& record: new_records) {
+    record.emplace("_topic", core::RecordField(msg_queue.front().topic));
+    auto topic_segments = utils::string::split(msg_queue.front().topic, "/");
+    core::RecordArray topic_segments_array;
+    for (size_t i = 0; i < topic_segments.size(); ++i) {
+      topic_segments_array.emplace_back(core::RecordField(topic_segments[i]));
+    }
+    record.emplace("_topicSegments", 
core::RecordField(std::move(topic_segments_array)));
+    record.emplace("_qos", core::RecordField(msg_queue.front().contents->qos));
+    record.emplace("_isDuplicate", 
core::RecordField(msg_queue.front().contents->dup > 0));
+    record.emplace("_isRetained", 
core::RecordField(msg_queue.front().contents->retained > 0));

Review Comment:
   This code assumes all records in the RecordSet correspond to the same MQTT 
message, but it only uses `msg_queue.front()` for all records. If multiple 
records are created from a single MQTT message, this is correct, but the loop 
structure suggests this might be processing records from different messages 
incorrectly.
   ```suggestion
   void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, 
const SmartMessage& message) const {
     if (!add_attributes_as_fields_) {
       return;
     }
   
     for (auto& record: new_records) {
       record.emplace("_topic", core::RecordField(message.topic));
       auto topic_segments = utils::string::split(message.topic, "/");
       core::RecordArray topic_segments_array;
       for (size_t i = 0; i < topic_segments.size(); ++i) {
         
topic_segments_array.emplace_back(core::RecordField(topic_segments[i]));
       }
       record.emplace("_topicSegments", 
core::RecordField(std::move(topic_segments_array)));
       record.emplace("_qos", core::RecordField(message.contents->qos));
       record.emplace("_isDuplicate", core::RecordField(message.contents->dup > 
0));
       record.emplace("_isRetained", 
core::RecordField(message.contents->retained > 0));
   ```



##########
extensions/mqtt/tests/PublishMQTTTests.cpp:
##########
@@ -102,3 +124,46 @@ TEST_CASE_METHOD(Fixture, "PublishMQTT can publish the 
number of in-flight messa
     CHECK(it->value == 0.0);
   }
 }
+
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test sending XML message records", 
"[publishMQTTTest]") {
+  test_controller_.plan->addController("JsonTreeReader", "JsonTreeReader");
+  auto xml_writer = test_controller_.plan->addController("XMLRecordSetWriter", 
"XMLRecordSetWriter");
+  REQUIRE(test_controller_.plan->setProperty(xml_writer, 
minifi::standard::XMLRecordSetWriter::NameOfRootTag.name, "root"));
+  REQUIRE(test_controller_.plan->setProperty(xml_writer, 
minifi::standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"));
+
+  
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::Topic.name,
 "mytopic"));
+  
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
+  
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::RecordReader.name,
 "JsonTreeReader"));
+  
REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::RecordWriter.name,
 "XMLRecordSetWriter"));
+
+  const auto trigger_results = test_controller_.trigger(R"([{"element1": 
"value1"}, {"element2": "42"}])");
+  CHECK(trigger_results.at(TestPublishMQTTProcessor::Success).size() == 2);
+  const auto flow_file_1 = 
trigger_results.at(TestPublishMQTTProcessor::Success).at(0);
+
+  auto string_content = test_controller_.plan->getContent(flow_file_1);
+  CHECK(string_content == R"(<?xml 
version="1.0"?><root><record><element1>value1</element1></record></root>)");
+
+  const auto flow_file_2 = 
trigger_results.at(TestPublishMQTTProcessor::Success).at(1);
+  string_content = test_controller_.plan->getContent(flow_file_2);
+  CHECK(string_content == R"(<?xml 
version="1.0"?><root><record><element2>42</element2></record></root>)");
+}
+
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if 
non-existant recordset reader or writer is set", "[publishMQTTTest]") {

Review Comment:
   Spelling error: 'non-existant' should be 'non-existent'.
   ```suggestion
   TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if 
non-existent recordset reader or writer is set", "[publishMQTTTest]") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to