fgerlits commented on code in PR #1367:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1367#discussion_r933201410


##########
extensions/http-curl/tests/AlertTests.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) 
{}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), 
doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), 
doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_file = std::filesystem::path(dir.getPath()) / "config.yml";

Review Comment:
   ```suggestion
     auto flow_config_file = std::filesystem::path(dir.getPath()) / 
"config.yml";
   ```
   since a "flow file" is a different thing



##########
extensions/http-curl/tests/AlertTests.cpp:
##########
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#define CATCH_CONFIG_MAIN
+#include "TestBase.h"
+#include "Catch.h"
+#include "ServerAwareHandler.h"
+#include "CivetServer.h"
+#include "TestServer.h"
+#include "HTTPIntegrationBase.h"
+#include "rapidjson/document.h"
+#include "EmptyFlow.h"
+#include "Utils.h"
+#include "TestUtils.h"
+
+class AlertHandler : public ServerAwareHandler {
+ public:
+  explicit AlertHandler(std::string agent_id): agent_id_(std::move(agent_id)) 
{}
+
+  bool handlePut(CivetServer* , struct mg_connection *conn) override {
+    auto msg = readPayload(conn);
+    rapidjson::Document doc;
+    rapidjson::ParseResult res = doc.Parse(msg.c_str());
+    REQUIRE(static_cast<bool>(res));
+    REQUIRE(doc.IsObject());
+    REQUIRE(doc.HasMember("agentId"));
+    REQUIRE(doc["agentId"].IsString());
+    REQUIRE(doc.HasMember("alerts"));
+    REQUIRE(doc["alerts"].IsArray());
+    REQUIRE(doc["alerts"].Size() > 0);
+    std::string id(doc["agentId"].GetString(), 
doc["agentId"].GetStringLength());
+    REQUIRE(id == agent_id_);
+    std::vector<std::string> batch;
+    for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+      REQUIRE(doc["alerts"][i].IsString());
+      batch.emplace_back(doc["alerts"][i].GetString(), 
doc["alerts"][i].GetStringLength());
+    }
+    alerts_.enqueue(std::move(batch));
+    return true;
+  }
+
+  std::string agent_id_;
+  utils::ConditionConcurrentQueue<std::vector<std::string>> alerts_;
+};
+
+class VerifyAlerts : public HTTPIntegrationBase {
+ public:
+  void testSetup() override {}
+
+  void runAssertions() override {
+    verify_();
+  }
+
+  std::function<bool()> verify_;
+};
+
+TEST_CASE("Alert system forwards logs") {
+  auto clock = std::make_shared<utils::ManualClock>();
+  utils::timeutils::setClock(clock);
+
+  TempDirectory dir;
+  auto flow_file = std::filesystem::path(dir.getPath()) / "config.yml";
+  std::ofstream(flow_file) << empty_flow;
+
+  std::string agent_id = "test-agent-1";
+  VerifyAlerts harness;
+  AlertHandler handler(agent_id);
+  harness.setUrl("http://localhost:0/api/alerts";, &handler);
+  
harness.getConfiguration()->set(minifi::Configuration::nifi_c2_agent_identifier,
 agent_id);
+  harness.getConfiguration()->setHome(dir.getPath());
+
+  auto log_props = std::make_shared<logging::LoggerProperties>();
+  log_props->set("appender.alert1", "alert");
+  log_props->set("appender.alert1.url", harness.getC2RestUrl());
+  log_props->set("appender.alert1.filter", ".*<begin>(.*)<end>.*");
+  log_props->set("appender.alert1.rate.limit", "10 s");
+  log_props->set("appender.alert1.flush.period", "1 s");
+  log_props->set("logger.root", "INFO,alert1");
+  logging::LoggerConfiguration::getConfiguration().initialize(log_props);

Review Comment:
   It could be useful to add something like this in a `minifi-log.properties` 
file under `examples`.



##########
libminifi/src/core/logging/LoggerConfiguration.cpp:
##########
@@ -60,26 +62,28 @@ namespace org::apache::nifi::minifi::core::logging {
 
 const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d 
%H:%M:%S.%e] [%n] [%l] %v";
 
-namespace {
-std::optional<spdlog::level::level_enum> parse_log_level(const std::string& 
level_name) {
-  if (utils::StringUtils::equalsIgnoreCase(level_name, "trace")) {
-    return spdlog::level::trace;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "debug")) {
-    return spdlog::level::debug;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "info")) {
-    return spdlog::level::info;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "warn")) {
-    return spdlog::level::warn;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "error")) {
-    return spdlog::level::err;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "critical")) {
-    return spdlog::level::critical;
-  } else if (utils::StringUtils::equalsIgnoreCase(level_name, "off")) {
-    return spdlog::level::off;
+namespace internal {
+
+bool LoggerNamespace::findSink(std::function<bool(const 
std::shared_ptr<spdlog::sinks::sink>&)> filter) const {

Review Comment:
   It is kind of interesting that we can apply an operation to all sinks by 
using a "find" function and giving it a predicate which always returns `false`, 
but since we never use `findSink` to find a sink, I think it would be more 
readable, and simpler, to replace it with `void 
forEachSink(std::function<void(...)> operation)`.



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& 
prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, 
std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be 
available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + 
".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be 
available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be 
available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + 
suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", 
prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", 
prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", 
TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", 
TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 
MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + 
".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* 
controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", 
config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = 
controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = 
std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", 
config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", 
config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, 
config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet 
supported on all platforms
+      submatch = std::string_view(std::to_address(match[idx].first), 
std::distance(match[idx].first, match[idx].second));
+    }
+    hash = utils::hash_combine(hash, std::hash<std::string_view>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), 
formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return 
!running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something 
else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+}

Review Comment:
   do we need to delete `services_`, too?



##########
libminifi/include/utils/Id.h:
##########
@@ -141,18 +142,14 @@ struct hash<org::apache::nifi::minifi::utils::Identifier> 
{
   size_t operator()(const org::apache::nifi::minifi::utils::Identifier& id) 
const noexcept {
     static_assert(sizeof(org::apache::nifi::minifi::utils::Identifier) % 
sizeof(size_t) == 0);
     constexpr int slices = 
sizeof(org::apache::nifi::minifi::utils::Identifier) / sizeof(size_t);

Review Comment:
   I know it's not new code, but this hash function has problems: if 
`sizeof(Identifier) < sizeof(size_t)`, then it reads uninitialized memory, and 
if `sizeof(Identifier)` is not a multiple of `sizeof(size_t)`, then the hash 
doesn't depend on the last few bytes of the `Identifier`.
   
   Of course, in real life the size of `Identifier` is 16, which is either 2 or 
4 times the size of `size_t`, so this is not a problem.  Maybe we could add a 
`static_assert(sizeof(Identifier) >= sizeof(size_t) && sizeof(Identifier) % 
sizeof(size_t) == 0)` for now?



##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -70,6 +72,16 @@ class Clock {
  public:
   virtual ~Clock() = default;
   virtual std::chrono::milliseconds timeSinceEpoch() const = 0;
+  virtual std::chrono::milliseconds wait_until(std::condition_variable& cv, 
std::unique_lock<std::mutex>& lck, std::chrono::milliseconds time, const 
std::function<bool()>& pred) {

Review Comment:
   I would prefer to have a parallel `ThreadSafeClock` hierarchy with classes 
which have a `Clock` (subclass) as a member instead of adding complexity to the 
`Clock` classes directly.



##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "controllers/SSLContextService.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;
+    spdlog::level::level_enum level;
+  };
+
+  struct Services {
+    std::shared_ptr<controllers::SSLContextService> ssl_service;
+    std::shared_ptr<AgentIdentificationProvider> agent_id;
+  };
+
+  struct LogBuffer {
+    size_t size_{0};
+    std::deque<std::pair<std::string, size_t>> data_;
+
+    static LogBuffer allocate(size_t size);
+    LogBuffer commit();
+    [[nodiscard]]
+    size_t size() const;
+  };
+
+  class LiveLogSet {
+    std::chrono::milliseconds lifetime_{};
+    std::unordered_set<size_t> ignored_;
+    std::deque<std::pair<std::chrono::milliseconds, size_t>> ordered_;

Review Comment:
   I think these could be named better; maybe
   ```suggestion
       std::unordered_set<size_t> hashes_to_ignore_;
       std::deque<std::pair<std::chrono::milliseconds, size_t>> 
timestamped_hashes_;
   ```
   ?
   
   Also, typedefing `size_t` to something like `Hash` could help with 
readability.



##########
libminifi/include/core/logging/alert/AlertSink.h:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <unordered_set>
+#include <regex>
+#include <utility>
+#include <string>
+#include <memory>
+
+#include "controllers/SSLContextService.h"
+#include "core/controller/ControllerServiceProvider.h"
+#include "core/logging/LoggerProperties.h"
+#include "utils/ThreadPool.h"
+#include "utils/StagingQueue.h"
+#include "properties/Configure.h"
+#include "spdlog/sinks/base_sink.h"
+
+
+namespace org::apache::nifi::minifi::core::logging {
+
+class AlertSink : public spdlog::sinks::base_sink<std::mutex> {
+  struct Config {
+    std::string url;
+    std::optional<std::string> ssl_service_name;
+    int batch_size;
+    std::chrono::milliseconds flush_period;
+    std::chrono::milliseconds rate_limit;
+    int buffer_limit;
+    std::regex filter;

Review Comment:
   Are you sure we want to use `std::regex`?  Elsewhere we generally use 
`utils::Regex` because `std::regex` has performance issues with long targets.  
Even if long targets are not likely here, we may prefer to use the same kind of 
regex everywhere.



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& 
prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, 
std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be 
available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + 
".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be 
available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be 
available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + 
suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", 
prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", 
prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", 
TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", 
TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 
MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + 
".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* 
controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", 
config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = 
controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = 
std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", 
config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", 
config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, 
config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet 
supported on all platforms

Review Comment:
   Can you add to the comment on which platform it is not supported yet, 
please?  That way, we'll know when we can remove the workaround.



##########
libminifi/src/core/logging/alert/AlertSink.cpp:
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/logging/alert/AlertSink.h"
+#include "core/TypedValues.h"
+#include "core/ClassLoader.h"
+#include "utils/HTTPClient.h"
+#include "utils/Hash.h"
+#include "core/logging/Utils.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::core::logging {
+
+AlertSink::AlertSink(Config config, std::shared_ptr<Logger> logger)
+    : config_(std::move(config)),
+      buffer_(config_.buffer_limit, config_.batch_size),
+      logger_(std::move(logger)) {
+  set_level(config_.level);
+  live_logs_.setLifetime(config_.rate_limit);
+  next_flush_ = clock_->timeSinceEpoch() + config_.flush_period;
+  flush_thread_ = std::thread([this] {run();});
+}
+
+std::shared_ptr<AlertSink> AlertSink::create(const std::string& 
prop_name_prefix, const std::shared_ptr<LoggerProperties>& logger_properties, 
std::shared_ptr<Logger> logger) {
+  Config config;
+
+  if (auto url = logger_properties->getString(prop_name_prefix + ".url")) {
+    config.url = url.value();
+  } else {
+    logger->log_info("Missing '%s.url' value, network logging won't be 
available", prop_name_prefix);
+    return {};
+  }
+
+  if (auto filter_str = logger_properties->getString(prop_name_prefix + 
".filter")) {
+    try {
+      config.filter = filter_str.value();
+    } catch (const std::regex_error& err) {
+      logger->log_error("Invalid '%s.filter' value, network logging won't be 
available: %s", prop_name_prefix, err.what());
+      return {};
+    }
+  } else {
+    logger->log_error("Missing '%s.filter' value, network logging won't be 
available", prop_name_prefix);
+    return {};
+  }
+
+  auto readPropertyOr = [&] (auto suffix, auto parser, auto fallback) {
+    if (auto prop_str = logger_properties->getString(prop_name_prefix + 
suffix)) {
+      if (auto prop_val = parser(prop_str.value())) {
+        return prop_val.value();
+      }
+      logger->log_error("Invalid '%s' value, using default '%s'", 
prop_name_prefix + suffix, fallback);
+    } else {
+      logger->log_info("Missing '%s' value, using default '%s'", 
prop_name_prefix + suffix, fallback);
+    }
+    return parser(fallback).value();
+  };
+
+  auto datasize_parser = [] (const std::string& str) -> std::optional<int> {
+    int val;
+    if (DataSizeValue::StringToInt(str, val)) {
+      return val;
+    }
+    return {};
+  };
+
+  config.batch_size = readPropertyOr(".batch.size", datasize_parser, "100 KB");
+  config.flush_period = readPropertyOr(".flush.period", 
TimePeriodValue::fromString, "5 s").getMilliseconds();
+  config.rate_limit = readPropertyOr(".rate.limit", 
TimePeriodValue::fromString, "10 min").getMilliseconds();
+  config.buffer_limit = readPropertyOr(".buffer.limit", datasize_parser, "1 
MB");
+  config.level = readPropertyOr(".level", utils::parse_log_level, "trace");
+  config.ssl_service_name = logger_properties->getString(prop_name_prefix + 
".ssl.context.service");
+
+  return std::make_shared<AlertSink>(std::move(config), std::move(logger));
+}
+
+void AlertSink::initialize(core::controller::ControllerServiceProvider* 
controller, std::shared_ptr<AgentIdentificationProvider> agent_id) {
+  auto services = std::make_unique<Services>();
+
+  services->agent_id = std::move(agent_id);
+
+  if (config_.ssl_service_name) {
+    if (!controller) {
+      logger_->log_error("Could not find service '%s': no service provider", 
config_.ssl_service_name.value());
+      return;
+    }
+    if (auto service = 
controller->getControllerService(config_.ssl_service_name.value())) {
+      if (auto ssl_service = 
std::dynamic_pointer_cast<controllers::SSLContextService>(service)) {
+        services->ssl_service = ssl_service;
+      } else {
+        logger_->log_error("Service '%s' is not an SSLContextService", 
config_.ssl_service_name.value());
+        return;
+      }
+    } else {
+      logger_->log_error("Could not find service '%s'", 
config_.ssl_service_name.value());
+      return;
+    }
+  }
+
+  services.reset(services_.exchange(services.release()));
+}
+
+void AlertSink::sink_it_(const spdlog::details::log_msg& msg) {
+  // this method is protected upstream in base_sink by a mutex
+
+  std::match_results<std::string_view::const_iterator> match;
+  std::string_view payload(msg.payload.data(), msg.payload.size());
+  if (!std::regex_match(payload.begin(), payload.end(), match, 
config_.filter)) {
+    return;
+  }
+  size_t hash = 0;
+  for (size_t idx = 1; idx < match.size(); ++idx) {
+    std::string_view submatch;
+    if (match[idx].first != match[idx].second) {
+      // TODO(adebreceni): std::string_view(It begin, It end) is not yet 
supported on all platforms
+      submatch = std::string_view(std::to_address(match[idx].first), 
std::distance(match[idx].first, match[idx].second));
+    }
+    hash = utils::hash_combine(hash, std::hash<std::string_view>{}(submatch));
+  }
+  if (!live_logs_.tryAdd(clock_->timeSinceEpoch(), hash)) {
+    return;
+  }
+
+  spdlog::memory_buf_t formatted;
+  formatter_->format(msg, formatted);
+
+  buffer_.modify([&] (LogBuffer& log_buf) {
+    log_buf.size_ += formatted.size();
+    log_buf.data_.emplace_back(std::string{formatted.data(), 
formatted.size()}, hash);
+  });
+}
+
+void AlertSink::flush_() {}
+
+void AlertSink::run() {
+  while (running_) {
+    {
+      std::unique_lock lock(mtx_);
+      next_flush_ = clock_->wait_until(cv_, lock, next_flush_, [&] {return 
!running_;}) + config_.flush_period;
+    }
+    std::unique_ptr<Services> services(services_.exchange(nullptr));
+    if (!services || !running_) {
+      continue;
+    }
+    try {
+      send(*services);
+    } catch (const std::exception& err) {
+      logger_->log_error("Exception while sending logs: %s", err.what());
+    } catch (...) {
+      logger_->log_error("Unknown exception while sending logs");
+    }
+    Services* expected{nullptr};
+    // only restore the services pointer if no initialize set it to something 
else meanwhile
+    if (services_.compare_exchange_strong(expected, services.get())) {
+      (void)services.release();
+    }
+  }
+}
+
+AlertSink::~AlertSink() {
+  {
+    std::lock_guard lock(mtx_);
+    running_ = false;
+    cv_.notify_all();
+  }
+  if (flush_thread_.joinable()) {
+    flush_thread_.join();
+  }
+}
+
+void AlertSink::send(Services& services) {
+  LogBuffer logs;
+  buffer_.commit();
+  if (!buffer_.tryDequeue(logs)) {
+    return;
+  }
+
+  auto client = 
core::ClassLoader::getDefaultClassLoader().instantiate<utils::BaseHTTPClient>("HTTPClient",
 "HTTPClient");
+  if (!client) {
+    logger_->log_error("Could not instantiate a HTTPClient object");
+    return;
+  }
+  client->initialize("PUT", config_.url, services.ssl_service);
+
+  rapidjson::Document doc(rapidjson::kObjectType);
+  std::string agent_id = services.agent_id->getAgentIdentifier();
+  doc.AddMember("agentId", rapidjson::Value(agent_id.data(), 
agent_id.length()), doc.GetAllocator());
+  doc.AddMember("alerts", rapidjson::Value(rapidjson::kArrayType), 
doc.GetAllocator());
+  for (const auto& [log, _] : logs.data_) {
+    doc["alerts"].PushBack(rapidjson::Value(log.data(), log.size()), 
doc.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  auto data_input = std::make_unique<utils::ByteInputCallback>();
+  auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
+  data_input->write(std::string(buffer.GetString(), buffer.GetSize()));
+  data_cb->ptr = data_input.get();
+  client->setUploadCallback(data_cb.get());
+  client->setContentType("application/json");
+
+  bool req_success = client->submit();
+
+  int64_t resp_code = client->getResponseCode();
+  const bool client_err = 400 <= resp_code && resp_code < 500;
+  const bool server_err = 500 <= resp_code && resp_code < 600;
+  if (client_err || server_err) {
+    logger_->log_error("Error response code '" "%" PRId64 "' from '%s'", 
resp_code, config_.url);
+  } else {
+    logger_->log_debug("Response code '" "%" PRId64 "' from '%s'", resp_code, 
config_.url);

Review Comment:
   This could be logged at `info` or `warning` level if the response code is 
not an error, but not a `2xx` success, either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to