fgerlits commented on code in PR #1920:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1920#discussion_r2081865848


##########
libminifi/src/core/state/nodes/FlowInformation.cpp:
##########
@@ -99,6 +102,30 @@ std::vector<SerializedResponseNode> 
FlowInformation::serialize() {
     serialized.push_back(processorsStatusesNode);
   }
 
+  if (bulletin_store_) {
+    SerializedResponseNode processorBulletinsNode{.name = 
"processorBulletins", .array = true, .collapsible = false};
+    auto bulletins = bulletin_store_->getBulletins(5min);
+    for (const auto& bulletin : bulletins) {
+      processorBulletinsNode.children.push_back({
+        .name = std::to_string(bulletin.id),
+        .collapsible = false,
+        .children = {
+          {.name = "id", .value = bulletin.id},
+          {.name = "timestamp", .value = 
utils::timeutils::getNiFiDateTimeFormat(std::chrono::time_point_cast<std::chrono::seconds>(bulletin.timestamp))},

Review Comment:
   Are we required to truncate timestamps to seconds?  Including milliseconds 
could be useful.



##########
libminifi/include/core/ProcessorConfig.h:
##########
@@ -47,16 +41,11 @@ struct ProcessorConfig {
   std::string schedulingPeriod;
   std::string penalizationPeriod;
   std::string yieldPeriod;
+  std::string bulletinLevel;

Review Comment:
   There is another `ProcessorConfig.h`, at 
`minifi-api/include/minifi-cpp/core/ProcessorConfig.h`. Do we need to add 
`bulletinLevel` there, too? TBH it's not completely clear to me why we have two 
of these...



##########
libminifi/src/core/ProcessGroup.cpp:
##########
@@ -485,4 +487,15 @@ ParameterContext* ProcessGroup::getParameterContext() 
const {
   return parameter_context_;
 }
 
+std::string ProcessGroup::buildGroupPath() const {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::string path = name_;
+  auto parent = parent_process_group_;
+  while (parent != nullptr) {
+    path.insert(0, parent->getName() + " / ");

Review Comment:
   Do we really want to put spaces around the '/'?  There aren't any in the 
path example in BulletinStoreTests.cpp.



##########
libminifi/src/core/BulletinStore.cpp:
##########
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/BulletinStore.h"
+
+#include "core/logging/LoggerBase.h"
+
+namespace org::apache::nifi::minifi::core {
+
+BulletinStore::BulletinStore(const Configure &configure) {
+  auto max_bulletin_count_str = 
configure.get(Configuration::nifi_c2_flow_info_processor_bulletin_limit);
+  if (!max_bulletin_count_str) {
+    logger_->log_debug("Bulletin limit not set, using default value of {}", 
DEFAULT_BULLETIN_COUNT);
+    max_bulletin_count_ = DEFAULT_BULLETIN_COUNT;
+    return;
+  }
+  try {
+    max_bulletin_count_ = std::stoul(*max_bulletin_count_str);
+  } catch(const std::exception&) {
+    logger_->log_warn("Invalid value for bulletin limit, using default value 
of {}", DEFAULT_BULLETIN_COUNT);
+    max_bulletin_count_ = DEFAULT_BULLETIN_COUNT;
+  }
+}
+
+void BulletinStore::addProcessorBulletin(const core::Processor& processor, 
core::logging::LOG_LEVEL log_level, const std::string& message) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  Bulletin bulletin;
+  bulletin.id = id_counter++;
+  bulletin.timestamp = std::chrono::system_clock::now();
+  bulletin.level = core::logging::mapLogLevelToString(log_level);
+  bulletin.category = "Log Message";
+  bulletin.message = message;
+  bulletin.group_id = processor.getProcessGroupUUIDStr();
+  bulletin.group_name = processor.getProcessGroupName();
+  bulletin.group_path = processor.getProcessGroupPath();
+  bulletin.source_id = processor.getUUIDStr();
+  bulletin.source_name = processor.getName();
+  if (bulletins_.size() >= max_bulletin_count_) {
+    bulletins_.pop_front();
+  }
+  bulletins_.push_back(std::move(bulletin));
+}
+
+std::deque<Bulletin> 
BulletinStore::getBulletins(std::optional<std::chrono::system_clock::duration> 
time_interval_to_include) const {
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (!time_interval_to_include) {
+    return bulletins_;
+  }
+  for (auto it = bulletins_.begin(); it != bulletins_.end(); ++it) {
+    if (std::chrono::system_clock::now() - it->timestamp <= 
*time_interval_to_include) {
+      return {it, bulletins_.end()};
+    }
+  }

Review Comment:
   This arithmetic is difficult for my brain to parse; I would prefer something 
like this:
   ```suggestion
     const auto timestamp_cutoff = std::chrono::system_clock::now() - 
*time_interval_to_include;
     for (auto it = bulletins_.begin(); it != bulletins_.end(); ++it) {
       if (it->timestamp >= timestamp_cutoff) {
         return {it, bulletins_.end()};
       }
     }
   ```



##########
libminifi/test/unit/BulletinStoreTests.cpp:
##########
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "core/BulletinStore.h"
+#include "properties/Configure.h"
+#include "unit/DummyProcessor.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class BulletinStoreTestAccessor {
+ public:
+  static std::deque<core::Bulletin>& getBulletins(core::BulletinStore& store) {
+    return store.bulletins_;
+  }
+};
+
+std::unique_ptr<core::Processor> createDummyProcessor() {
+  auto processor = std::make_unique<DummyProcessor>("DummyProcessor", 
minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value());
+  processor->setProcessGroupUUIDStr("68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2");
+  processor->setProcessGroupName("sub_group");
+  processor->setProcessGroupPath("root/sub_group");
+  return processor;
+}
+
+TEST_CASE("Create BulletinStore with default max size of 1000", 
"[bulletinStore]") {
+  ConfigureImpl configuration;
+  SECTION("No limit is configured") {}
+  SECTION("Invalid value is configured") {
+    configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, 
"invalid");
+  }
+  core::BulletinStore bulletin_store(configuration);
+  REQUIRE(bulletin_store.getMaxBulletinCount() == 1000);
+}
+
+TEST_CASE("Create BulletinStore with custom max size of 10000", 
"[bulletinStore]") {
+  ConfigureImpl configuration;
+  configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, 
"10000");
+  core::BulletinStore bulletin_store(configuration);
+  REQUIRE(bulletin_store.getMaxBulletinCount() == 10000);
+}
+
+TEST_CASE("Remove oldest entries when limit is reached", "[bulletinStore]") {
+  ConfigureImpl configuration;
+  configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, 
"2");
+  core::BulletinStore bulletin_store(configuration);
+  auto processor = createDummyProcessor();
+  for (size_t i = 0; i < 3; ++i) {
+    bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, 
"Warning message");
+  }
+  auto bulletins = bulletin_store.getBulletins();
+  REQUIRE(bulletins.size() == 2);
+  REQUIRE(bulletins[0].id == 2);
+  REQUIRE(bulletins[1].id == 3);
+  REQUIRE(bulletins[0].message == "Warning message");
+}
+
+TEST_CASE("Return all bulletins when no time interval is defined or all 
entries are part of the time interval", "[bulletinStore]") {

Review Comment:
   Nitpicking, but this test doesn't really test the "... or all entries are 
part of the time interval" part.  I'm OK with either expanding the test or 
changing the test description, but they should be in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to