This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 93ef73a00f8775f0bdcd63554ab10fdabb49652f Author: Gabor Gyimesi <gamezb...@gmail.com> AuthorDate: Mon Jan 2 15:08:34 2023 +0100 MINIFICPP-2022 Add valid repository size metrics for all repositories Signed-off-by: Ferenc Gerlits <fgerl...@gmail.com> This closes #1490 --- METRICS.md | 40 ++-- controller/tests/ControllerTests.cpp | 4 +- .../cluster/checkers/PrometheusChecker.py | 19 +- extensions/coap/tests/CoapIntegrationBase.h | 4 +- extensions/http-curl/tests/C2PauseResumeTest.cpp | 3 +- .../tests/PrometheusMetricsPublisherTest.cpp | 9 +- .../rocksdb-repos/DatabaseContentRepository.cpp | 19 ++ .../rocksdb-repos/DatabaseContentRepository.h | 3 + extensions/rocksdb-repos/FlowFileRepository.cpp | 73 +------ extensions/rocksdb-repos/FlowFileRepository.h | 33 +-- extensions/rocksdb-repos/ProvenanceRepository.cpp | 94 +++------ extensions/rocksdb-repos/ProvenanceRepository.h | 25 +-- extensions/rocksdb-repos/RocksDbRepository.cpp | 112 +++++++++++ extensions/rocksdb-repos/RocksDbRepository.h | 63 ++++++ extensions/rocksdb-repos/database/OpenRocksDb.cpp | 13 ++ extensions/rocksdb-repos/database/OpenRocksDb.h | 3 + libminifi/include/core/ContentRepository.h | 12 +- libminifi/include/core/Repository.h | 30 +-- libminifi/include/core/RepositoryFactory.h | 5 +- libminifi/include/core/RepositoryMetricsSource.h | 45 +++++ .../include/core/repository/FileSystemRepository.h | 14 ++ .../core/repository/VolatileContentRepository.h | 18 +- .../include/core/repository/VolatileRepository.h | 16 +- .../core/repository/VolatileRepositoryData.h | 18 ++ .../include/core/state/MetricsPublisherStore.h | 5 +- .../include/core/state/nodes/AgentInformation.h | 73 ++++--- .../include/core/state/nodes/RepositoryMetrics.h | 56 ++++-- .../include/core/state/nodes/ResponseNodeLoader.h | 8 +- libminifi/include/utils/file/FileUtils.h | 14 ++ libminifi/src/core/RepositoryFactory.cpp | 9 + .../src/core/repository/VolatileRepository.cpp | 6 +- .../src/core/repository/VolatileRepositoryData.cpp | 1 + libminifi/src/core/state/MetricsPublisherStore.cpp | 6 +- .../src/core/state/nodes/ResponseNodeLoader.cpp | 26 +-- libminifi/test/flow-tests/TestControllerWithFlow.h | 4 +- libminifi/test/integration/IntegrationBase.h | 5 +- .../rocksdb-tests/DBProvenanceRepositoryTests.cpp | 2 +- libminifi/test/rocksdb-tests/RepoTests.cpp | 221 +++++++++++++++++++++ .../unit/ControllerSocketMetricsPublisherTest.cpp | 2 +- libminifi/test/unit/FileUtilsTests.cpp | 39 ++++ libminifi/test/unit/MetricsTests.cpp | 140 ++++++------- libminifi/test/unit/ProvenanceTestHelper.h | 51 ++++- libminifi/test/unit/ResponseNodeLoaderTests.cpp | 2 +- minifi_main/MiNiFiMain.cpp | 3 +- 44 files changed, 931 insertions(+), 417 deletions(-) diff --git a/METRICS.md b/METRICS.md index f6c87dbaf..f0cc52d54 100644 --- a/METRICS.md +++ b/METRICS.md @@ -100,17 +100,19 @@ QueueMetrics is a system level metric that reports queue metrics for every conne ### RepositoryMetrics -RepositoryMetrics is a system level metric that reports metrics for the registered repositories (by default flowfile and provenance repository) +RepositoryMetrics is a system level metric that reports metrics for the registered repositories (by default flowfile, content, and provenance repositories) -| Metric name | Labels | Description | -|----------------------|-----------------|---------------------------------------| -| is_running | repository_name | Is the repository running (1 or 0) | -| is_full | repository_name | Is the repository full (1 or 0) | -| repository_size | repository_name | Current size of the repository | +| Metric name | Labels | Description | +|---------------------------|-----------------|-------------------------------------------------| +| is_running | repository_name | Is the repository running (1 or 0) | +| is_full | repository_name | Is the repository full (1 or 0) | +| repository_size_bytes | repository_name | Current size of the repository | +| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) | +| repository_entry_count | repository_name | Current number of entries in the repository | -| Label | Description | -|--------------------------|-----------------------------------------------------------------| -| repository_name | Name of the reported repository | +| Label | Description | +|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------| +| repository_name | Name of the reported repository. There are three repositories present with the following names: `flowfile`, `content` and `provenance` | ### DeviceInfoNode @@ -145,15 +147,17 @@ FlowInformation is a system level metric that reports component and queue relate AgentStatus is a system level metric that defines current agent status including repository, component and resource usage information. -| Metric name | Labels | Description | -|--------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------| -| is_running | repository_name | Is the repository running (1 or 0) | -| is_full | repository_name | Is the repository full (1 or 0) | -| repository_size | repository_name | Current size of the repository | -| uptime_milliseconds | - | Agent uptime in milliseconds | -| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | -| agent_memory_usage_bytes | - | Memory used by the agent process in bytes | -| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. | +| Metric name | Labels | Description | +|---------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------| +| is_running | repository_name | Is the repository running (1 or 0) | +| is_full | repository_name | Is the repository full (1 or 0) | +| repository_size_bytes | repository_name | Current size of the repository | +| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) | +| repository_entry_count | repository_name | Current number of entries in the repository | +| uptime_milliseconds | - | Agent uptime in milliseconds | +| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | +| agent_memory_usage_bytes | - | Memory used by the agent process in bytes | +| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. | | Label | Description | |-----------------|----------------------------------------------------------| diff --git a/controller/tests/ControllerTests.cpp b/controller/tests/ControllerTests.cpp index 49ad26f79..c88af54df 100644 --- a/controller/tests/ControllerTests.cpp +++ b/controller/tests/ControllerTests.cpp @@ -514,7 +514,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test manifest getter", "[controllerTest } auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher"); - auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, nullptr, nullptr, nullptr); + auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr); reporter->initialize(configuration_, response_node_loader); initalizeControllerSocket(reporter); @@ -538,7 +538,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test jstack getter", "[controllerTests] } auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher"); - auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, nullptr, nullptr, nullptr); + auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr); reporter->initialize(configuration_, response_node_loader); initalizeControllerSocket(reporter); diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index 461e9a5e8..bd7daff8b 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -47,8 +47,9 @@ class PrometheusChecker: raise Exception("Metric class '%s' verification is not implemented" % metric_class) def verify_repository_metrics(self): - label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}] - return all((self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels) for labels in label_list)) + label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}, {'repository_name': 'content'}] + return all((self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size_bytes', 'minifi_max_repository_size_bytes', 'minifi_repository_entry_count'], 'RepositoryMetrics', labels) for labels in label_list)) and \ + all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'RepositoryMetrics', labels) for labels in label_list[1:3])) def verify_queue_metrics(self): return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'QueueMetrics') @@ -72,12 +73,22 @@ class PrometheusChecker: return self.verify_metrics_exist(['minifi_physical_mem', 'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode') def verify_agent_status_metrics(self): - label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}] + label_list = [{'repository_name': 'flowfile'}, {'repository_name': 'content'}] for labels in label_list: if not (self.verify_metric_exists('minifi_is_running', 'AgentStatus', labels) and self.verify_metric_exists('minifi_is_full', 'AgentStatus', labels) - and self.verify_metric_exists('minifi_repository_size', 'AgentStatus', labels)): + and self.verify_metric_exists('minifi_max_repository_size_bytes', 'AgentStatus', labels) + and self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'AgentStatus', labels) + and self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', labels)): return False + + # provenance repository is NoOpRepository by default which has zero size + if not (self.verify_metric_exists('minifi_is_running', 'AgentStatus', {'repository_name': 'provenance'}) + and self.verify_metric_exists('minifi_is_full', 'AgentStatus', {'repository_name': 'provenance'}) + and self.verify_metric_exists('minifi_max_repository_size_bytes', 'AgentStatus', {'repository_name': 'provenance'}) + and self.verify_metric_exists('minifi_repository_size_bytes', 'AgentStatus', {'repository_name': 'provenance'}) + and self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', {'repository_name': 'provenance'})): + return False return self.verify_metric_exists('minifi_uptime_milliseconds', 'AgentStatus') and \ self.verify_metric_exists('minifi_agent_memory_usage_bytes', 'AgentStatus') and \ self.verify_metric_exists('minifi_agent_cpu_utilization', 'AgentStatus') diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h index b498e1240..e7fbc1e65 100644 --- a/extensions/coap/tests/CoapIntegrationBase.h +++ b/extensions/coap/tests/CoapIntegrationBase.h @@ -21,6 +21,7 @@ #include <optional> #include <string> #include <utility> +#include <vector> #include "../tests/TestServer.h" #include "CivetServer.h" @@ -72,7 +73,8 @@ class CoapIntegrationBase : public IntegrationBase { queryRootProcessGroup(pg); - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, test_repo, test_flow_repo, yaml_ptr); + std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{test_repo, test_flow_repo, content_repo}; + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, yaml_ptr); auto controller = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, std::move(metrics_publisher_store)); controller->load(); diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp b/extensions/http-curl/tests/C2PauseResumeTest.cpp index 78bd1c54f..cf8e137a9 100644 --- a/extensions/http-curl/tests/C2PauseResumeTest.cpp +++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp @@ -135,7 +135,8 @@ int main(int argc, char **argv) { auto yaml_ptr = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, content_repo, stream_factory, configuration, args.test_file}); - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, test_repo, test_flow_repo, yaml_ptr); + std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{test_repo, test_flow_repo, content_repo}; + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, yaml_ptr); std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, std::move(metrics_publisher_store)); diff --git a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp index 6b39468f5..dcb11082f 100644 --- a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp +++ b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp @@ -52,7 +52,9 @@ class PrometheusPublisherTestFixture { : configuration_(std::make_shared<Configure>()), provenance_repo_(core::createRepository("provenancerepository")), flow_file_repo_(core::createRepository("flowfilerepository")), - response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_, provenance_repo_, flow_file_repo_, nullptr)) { + content_repo_(core::createContentRepository("volatilecontentrepository")), + response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_, + std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{provenance_repo_, flow_file_repo_, content_repo_}, nullptr)) { std::unique_ptr<DummyMetricsExposer> dummy_exposer; if (user_dummy_exposer) { dummy_exposer = std::make_unique<DummyMetricsExposer>(); @@ -63,8 +65,9 @@ class PrometheusPublisherTestFixture { protected: std::shared_ptr<Configure> configuration_; - std::shared_ptr<core::Repository> provenance_repo_; - std::shared_ptr<core::Repository> flow_file_repo_; + std::shared_ptr<core::RepositoryMetricsSource> provenance_repo_; + std::shared_ptr<core::RepositoryMetricsSource> flow_file_repo_; + std::shared_ptr<core::RepositoryMetricsSource> content_repo_; std::shared_ptr<state::response::ResponseNodeLoader> response_node_loader_; std::unique_ptr<PrometheusMetricsPublisher> publisher_; DummyMetricsExposer* exposer_ = nullptr; diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 187776778..56c0d2c83 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -267,6 +267,25 @@ void DatabaseContentRepository::clearOrphans() { } } +uint64_t DatabaseContentRepository::getRepositorySize() const { + return (utils::optional_from_ptr(db_.get()) | + utils::flatMap([](const auto& db) { return db->open(); }) | + utils::flatMap([](const auto& opendb) { return opendb.getApproximateSizes(); })).value_or(0); +} + +uint64_t DatabaseContentRepository::getRepositoryEntryCount() const { + return (utils::optional_from_ptr(db_.get()) | + utils::flatMap([](const auto& db) { return db->open(); }) | + utils::flatMap([](auto&& opendb) -> std::optional<uint64_t> { + std::string key_count; + opendb.GetProperty("rocksdb.estimate-num-keys", &key_count); + if (!key_count.empty()) { + return std::stoull(key_count); + } + return std::nullopt; + })).value_or(0); +} + REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository")); } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index 1305a7c2d..e084a6515 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -74,6 +74,9 @@ class DatabaseContentRepository : public core::ContentRepository { void start() override; void stop() override; + uint64_t getRepositorySize() const override; + uint64_t getRepositoryEntryCount() const override; + protected: bool removeKey(const std::string& content_path) override; diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 36999c4e0..b38a7fdce 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -30,6 +30,7 @@ #include "FlowFileRecord.h" #include "utils/gsl.h" #include "core/Resource.h" +#include "utils/OptionalUtils.h" using namespace std::literals::chrono_literals; @@ -110,24 +111,6 @@ void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal } } -void FlowFileRepository::printStats() { - auto opendb = db_->open(); - if (!opendb) { - return; - } - std::string key_count; - opendb->GetProperty("rocksdb.estimate-num-keys", &key_count); - - std::string table_readers; - opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); - - std::string all_memtables; - opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - - logger_->log_info("Repository stats: key count: %s, table readers size: %s, all memory tables size: %s", - key_count, table_readers, all_memtables); -} - void FlowFileRepository::run() { auto last = std::chrono::steady_clock::now(); while (isRunning()) { @@ -142,22 +125,6 @@ void FlowFileRepository::run() { flush(); } -bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) { - constexpr int RETRY_COUNT = 3; - std::chrono::milliseconds wait_time = 0ms; - for (int i=0; i < RETRY_COUNT; ++i) { - auto status = operation(); - if (status.ok()) { - logger_->log_trace("Rocksdb operation executed successfully"); - return true; - } - logger_->log_error("Rocksdb operation failed: %s", status.ToString()); - wait_time += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS; - std::this_thread::sleep_for(wait_time); - } - return false; -} - void FlowFileRepository::initialize_repository() { auto opendb = db_->open(); if (!opendb) { @@ -204,7 +171,6 @@ void FlowFileRepository::initialize_repository() { void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { content_repo_ = content_repo; - repo_size_ = 0; swap_loader_ = std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_); initialize_repository(); @@ -276,48 +242,11 @@ void FlowFileRepository::setCompactionPeriod(const std::shared_ptr<Configure> &c } } -bool FlowFileRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) { - // persistent to the DB - auto opendb = db_->open(); - if (!opendb) { - return false; - } - rocksdb::Slice value((const char *) buf, bufLen); - auto operation = [&key, &value, &opendb]() { return opendb->Put(rocksdb::WriteOptions(), key, value); }; - return ExecuteWithRetry(operation); -} - -bool FlowFileRepository::MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) { - auto opendb = db_->open(); - if (!opendb) { - return false; - } - auto batch = opendb->createWriteBatch(); - for (const auto &item : data) { - const auto buf = item.second->getBuffer().as_span<const char>(); - rocksdb::Slice value(buf.data(), buf.size()); - if (!batch.Put(item.first, value).ok()) { - logger_->log_error("Failed to add item to batch operation"); - return false; - } - } - auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); }; - return ExecuteWithRetry(operation); -} - bool FlowFileRepository::Delete(const std::string& key) { keys_to_delete_.enqueue({.key = key}); return true; } -bool FlowFileRepository::Get(const std::string &key, std::string &value) { - auto opendb = db_->open(); - if (!opendb) { - return false; - } - return opendb->Get(rocksdb::ReadOptions(), key, &value).ok(); -} - void FlowFileRepository::runCompaction() { do { if (auto opendb = db_->open()) { diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h index 68959ce88..dd1412ada 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.h +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -23,13 +23,11 @@ #include <list> #include "utils/file/FileUtils.h" -#include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/utilities/checkpoint.h" #include "core/Core.h" #include "core/logging/LoggerConfiguration.h" -#include "core/ThreadedRepository.h" #include "Connection.h" #include "concurrentqueue.h" #include "database/RocksDatabase.h" @@ -40,6 +38,7 @@ #include "range/v3/algorithm/all_of.hpp" #include "utils/Literals.h" #include "utils/StoppableThread.h" +#include "RocksDbRepository.h" namespace org::apache::nifi::minifi::core::repository { @@ -53,13 +52,12 @@ constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = "./flowfile_checkpoint"; constexpr auto MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE = 10_MiB; constexpr auto MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10); constexpr auto FLOWFILE_REPOSITORY_PURGE_PERIOD = std::chrono::seconds(2); -constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::milliseconds(500); /** * Flow File repository * Design: Extends Repository and implements the run function, using rocksdb as the primary substrate. */ -class FlowFileRepository : public ThreadedRepository, public SwapManager { +class FlowFileRepository : public RocksDbRepository, public SwapManager { static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2}; struct ExpiredFlowFileInfo { @@ -79,8 +77,8 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD) - : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod), - logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) { + : RocksDbRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), + std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod, logging::LoggerFactory<FlowFileRepository>::getLogger()) { } ~FlowFileRepository() override { @@ -95,18 +93,11 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { return false; } - void flush() override; - - virtual void printStats(); - - bool initialize(const std::shared_ptr<Configure> &configure) override; - - bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override; - bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override; bool Delete(const std::string& key) override; bool Delete(const std::shared_ptr<core::CoreComponent>& item) override; - bool Get(const std::string &key, std::string &value) override; + void flush() override; + bool initialize(const std::shared_ptr<Configure> &configure) override; void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override; bool start() override; bool stop() override; @@ -115,27 +106,17 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { private: void run() override; + void initialize_repository(); void runCompaction(); void setCompactionPeriod(const std::shared_ptr<Configure> &configure); - bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation); - - void initialize_repository(); - void deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& opendb, std::list<ExpiredFlowFileInfo>& flow_files); - std::thread& getThread() override { - return thread_; - } - moodycamel::ConcurrentQueue<ExpiredFlowFileInfo> keys_to_delete_; std::shared_ptr<core::ContentRepository> content_repo_; - std::unique_ptr<minifi::internal::RocksDatabase> db_; std::unique_ptr<FlowFileLoader> swap_loader_; - std::shared_ptr<logging::Logger> logger_; std::shared_ptr<minifi::Configure> config_; - std::thread thread_; std::chrono::milliseconds compaction_period_; std::unique_ptr<utils::StoppableThread> compaction_thread_; diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index 2680de9d5..15073835e 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -23,20 +23,6 @@ namespace org::apache::nifi::minifi::provenance { -void ProvenanceRepository::printStats() { - std::string key_count; - db_->GetProperty("rocksdb.estimate-num-keys", &key_count); - - std::string table_readers; - db_->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); - - std::string all_memtables; - db_->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - - logger_->log_info("Repository stats: key count: %s, table readers size: %s, all memory tables size: %s", - key_count, table_readers, all_memtables); -} - void ProvenanceRepository::run() { size_t count = 0; while (isRunning()) { @@ -66,64 +52,45 @@ bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::m } logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms", int64_t{max_partition_millis_.count()}); - rocksdb::Options options; - options.create_if_missing = true; - options.use_direct_io_for_flush_and_compaction = true; - options.use_direct_reads = true; - // Rocksdb write buffers act as a log of database operation: grow till reaching the limit, serialized after - // This shouldn't go above 16MB and the configured total size of the db should cap it as well - int64_t max_buffer_size = 16 << 20; - options.write_buffer_size = gsl::narrow<size_t>(std::min(max_buffer_size, max_partition_bytes_)); - options.max_write_buffer_number = 4; - options.min_write_buffer_number_to_merge = 1; - options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO; - options.compaction_options_fifo = rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false); - if (max_partition_millis_ > std::chrono::milliseconds(0)) { - options.ttl = std::chrono::duration_cast<std::chrono::seconds>(max_partition_millis_).count(); - } + auto db_options = [] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) { + db_opts.set(&rocksdb::DBOptions::create_if_missing, true); + db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true); + db_opts.set(&rocksdb::DBOptions::use_direct_reads, true); + }; - logger_->log_info("Write buffer: %llu", options.write_buffer_size); - logger_->log_info("Max partition bytes: %llu", max_partition_bytes_); - logger_->log_info("Ttl: %llu", options.ttl); + // Rocksdb write buffers act as a log of database operation: grow till reaching the limit, serialized after + // This shouldn't go above 16MB and the configured total size of the db should cap it as well + auto cf_options = [this] (rocksdb::ColumnFamilyOptions& cf_opts) { + int64_t max_buffer_size = 16 << 20; + cf_opts.write_buffer_size = gsl::narrow<size_t>(std::min(max_buffer_size, max_partition_bytes_)); + cf_opts.max_write_buffer_number = 4; + cf_opts.min_write_buffer_number_to_merge = 1; + + cf_opts.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO; + cf_opts.compaction_options_fifo = rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false); + if (max_partition_millis_ > std::chrono::milliseconds(0)) { + cf_opts.ttl = std::chrono::duration_cast<std::chrono::seconds>(max_partition_millis_).count(); + } + }; - rocksdb::DB* db; - rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db); - if (status.ok()) { + db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_); + if (db_->open()) { logger_->log_debug("MiNiFi Provenance Repository database open %s success", directory_); - db_.reset(db); } else { - logger_->log_error("MiNiFi Provenance Repository database open %s failed: %s", directory_, status.ToString()); + logger_->log_error("MiNiFi Provenance Repository database open %s failed", directory_); return false; } return true; } -bool ProvenanceRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) { - // persist to the DB - rocksdb::Slice value((const char *) buf, bufLen); - return db_->Put(rocksdb::WriteOptions(), key, value).ok(); -} - -bool ProvenanceRepository::MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) { - rocksdb::WriteBatch batch; - for (const auto &item : data) { - const auto buf = item.second->getBuffer().as_span<const char>(); - rocksdb::Slice value(buf.data(), buf.size()); - if (!batch.Put(item.first, value).ok()) { - return false; - } - } - return db_->Write(rocksdb::WriteOptions(), &batch).ok(); -} - -bool ProvenanceRepository::Get(const std::string &key, std::string &value) { - return db_->Get(rocksdb::ReadOptions(), key, &value).ok(); -} - bool ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) { - std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions())); + auto opendb = db_->open(); + if (!opendb) { + return false; + } + std::unique_ptr<rocksdb::Iterator> it(opendb->NewIterator(rocksdb::ReadOptions())); size_t requested_batch = max_size; max_size = 0; for (it->SeekToFirst(); it->Valid(); it->Next()) { @@ -144,13 +111,6 @@ void ProvenanceRepository::destroy() { db_.reset(); } -uint64_t ProvenanceRepository::getKeyCount() const { - std::string key_count; - db_->GetProperty("rocksdb.estimate-num-keys", &key_count); - - return std::stoull(key_count); -} - REGISTER_RESOURCE_AS(ProvenanceRepository, InternalResource, ("ProvenanceRepository", "provenancerepository")); } // namespace org::apache::nifi::minifi::provenance diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h index 7aaeaa34e..ca0b6a8ad 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.h +++ b/extensions/rocksdb-repos/ProvenanceRepository.h @@ -23,14 +23,13 @@ #include <algorithm> #include <utility> -#include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "core/Core.h" #include "core/logging/LoggerConfiguration.h" -#include "core/ThreadedRepository.h" #include "provenance/Provenance.h" #include "utils/Literals.h" +#include "RocksDbRepository.h" namespace org::apache::nifi::minifi::provenance { @@ -39,7 +38,7 @@ constexpr auto MAX_PROVENANCE_STORAGE_SIZE = 10_MiB; constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1); constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500); -class ProvenanceRepository : public core::ThreadedRepository { +class ProvenanceRepository : public core::repository::RocksDbRepository { public: ProvenanceRepository(std::string name, const utils::Identifier& /*uuid*/) : ProvenanceRepository(std::move(name)) { @@ -50,8 +49,8 @@ class ProvenanceRepository : public core::ThreadedRepository { std::chrono::milliseconds maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD) - : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<ProvenanceRepository>(), - directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) { + : RocksDbRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<ProvenanceRepository>(), + directory, maxPartitionMillis, maxPartitionBytes, purgePeriod, core::logging::LoggerFactory<ProvenanceRepository>::getLogger()) { } ~ProvenanceRepository() override { @@ -62,27 +61,19 @@ class ProvenanceRepository : public core::ThreadedRepository { EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; - void printStats(); - bool isNoop() const override { return false; } bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) override; - bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override; - bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override; - bool Delete(const std::string& /*key*/) override { // The repo is cleaned up by itself, there is no need to delete items. return true; } - - bool Get(const std::string &key, std::string &value) override; bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) override; void destroy(); - uint64_t getKeyCount() const; // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer @@ -93,14 +84,6 @@ class ProvenanceRepository : public core::ThreadedRepository { private: // Run function for the thread void run() override; - - std::thread& getThread() override { - return thread_; - } - - std::unique_ptr<rocksdb::DB> db_; - std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ProvenanceRepository>::getLogger(); - std::thread thread_; }; } // namespace org::apache::nifi::minifi::provenance diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp b/extensions/rocksdb-repos/RocksDbRepository.cpp new file mode 100644 index 000000000..02a735395 --- /dev/null +++ b/extensions/rocksdb-repos/RocksDbRepository.cpp @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "RocksDbRepository.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::core::repository { + +void RocksDbRepository::printStats() { + auto opendb = db_->open(); + if (!opendb) { + return; + } + std::string key_count; + opendb->GetProperty("rocksdb.estimate-num-keys", &key_count); + + std::string table_readers; + opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); + + std::string all_memtables; + opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); + + logger_->log_info("Repository stats: key count: %s, table readers size: %s, all memory tables size: %s", + key_count, table_readers, all_memtables); +} + +bool RocksDbRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) { + constexpr int RETRY_COUNT = 3; + std::chrono::milliseconds wait_time = 0ms; + for (int i=0; i < RETRY_COUNT; ++i) { + auto status = operation(); + if (status.ok()) { + logger_->log_trace("Rocksdb operation executed successfully"); + return true; + } + logger_->log_error("Rocksdb operation failed: %s", status.ToString()); + wait_time += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS; + std::this_thread::sleep_for(wait_time); + } + return false; +} + +bool RocksDbRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) { + auto opendb = db_->open(); + if (!opendb) { + return false; + } + rocksdb::Slice value((const char *) buf, bufLen); + auto operation = [&key, &value, &opendb]() { return opendb->Put(rocksdb::WriteOptions(), key, value); }; + return ExecuteWithRetry(operation); +} + +bool RocksDbRepository::MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) { + auto opendb = db_->open(); + if (!opendb) { + return false; + } + auto batch = opendb->createWriteBatch(); + for (const auto &item : data) { + const auto buf = item.second->getBuffer().as_span<const char>(); + rocksdb::Slice value(buf.data(), buf.size()); + if (!batch.Put(item.first, value).ok()) { + logger_->log_error("Failed to add item to batch operation"); + return false; + } + } + auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); }; + return ExecuteWithRetry(operation); +} + +bool RocksDbRepository::Get(const std::string &key, std::string &value) { + auto opendb = db_->open(); + if (!opendb) { + return false; + } + return opendb->Get(rocksdb::ReadOptions(), key, &value).ok(); +} + +uint64_t RocksDbRepository::getRepositorySize() const { + return (utils::optional_from_ptr(db_.get()) | + utils::flatMap([](const auto& db) { return db->open(); }) | + utils::flatMap([](const auto& opendb) { return opendb.getApproximateSizes(); })).value_or(0); +} + +uint64_t RocksDbRepository::getRepositoryEntryCount() const { + return (utils::optional_from_ptr(db_.get()) | + utils::flatMap([](const auto& db) { return db->open(); }) | + utils::flatMap([](auto&& opendb) -> std::optional<uint64_t> { + std::string key_count; + opendb.GetProperty("rocksdb.estimate-num-keys", &key_count); + if (!key_count.empty()) { + return std::stoull(key_count); + } + return std::nullopt; + })).value_or(0); +} + +} // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/RocksDbRepository.h b/extensions/rocksdb-repos/RocksDbRepository.h new file mode 100644 index 000000000..6fd220d8d --- /dev/null +++ b/extensions/rocksdb-repos/RocksDbRepository.h @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <utility> +#include <vector> +#include <string> +#include <memory> + +#include "database/RocksDatabase.h" +#include "core/ThreadedRepository.h" + +namespace org::apache::nifi::minifi::core::repository { + +constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::milliseconds(500); + +class RocksDbRepository : public ThreadedRepository { + public: + RocksDbRepository(std::string repo_name, + std::string directory, + std::chrono::milliseconds max_partition_millis, + int64_t max_partition_bytes, + std::chrono::milliseconds purge_period, + std::shared_ptr<logging::Logger> logger) + : ThreadedRepository(std::move(repo_name), std::move(directory), max_partition_millis, max_partition_bytes, purge_period), + logger_(std::move(logger)) { + } + + bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override; + bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override; + bool Get(const std::string &key, std::string &value) override; + + uint64_t getRepositorySize() const override; + uint64_t getRepositoryEntryCount() const override; + void printStats(); + + protected: + bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation); + + std::thread& getThread() override { + return thread_; + } + + std::unique_ptr<minifi::internal::RocksDatabase> db_; + std::shared_ptr<logging::Logger> logger_; + std::thread thread_; +}; + +} // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp index be5f1960c..703c66a39 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp +++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp @@ -122,4 +122,17 @@ rocksdb::DB* OpenRocksDb::get() { return impl_.get(); } +std::optional<uint64_t> OpenRocksDb::getApproximateSizes() const { + const rocksdb::SizeApproximationOptions options{ .include_memtables = true }; + std::string del_char_str(1, static_cast<char>(127)); + std::string empty_str; + const rocksdb::Range range(empty_str, del_char_str); + uint64_t value = 0; + auto status = impl_->GetApproximateSizes(options, column_->handle.get(), &range, 1, &value); + if (status.ok()) { + return value; + } + return std::nullopt; +} + } // namespace org::apache::nifi::minifi::internal diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h b/extensions/rocksdb-repos/database/OpenRocksDb.h index ebe25aacc..b4d964381 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.h +++ b/extensions/rocksdb-repos/database/OpenRocksDb.h @@ -21,6 +21,7 @@ #include <memory> #include <string> #include <vector> +#include <optional> #include "utils/gsl.h" #include "rocksdb/db.h" @@ -71,6 +72,8 @@ class OpenRocksDb { rocksdb::DB* get(); + std::optional<uint64_t> getApproximateSizes() const; + private: void handleResult(const rocksdb::Status& result); void handleResult(const std::vector<rocksdb::Status>& results); diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index 17762af26..68230d42b 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -26,22 +26,20 @@ #include "properties/Configure.h" #include "ResourceClaim.h" #include "StreamManager.h" -#include "core/Core.h" #include "ContentSession.h" +#include "core/RepositoryMetricsSource.h" +#include "core/Core.h" namespace org::apache::nifi::minifi::core { /** * Content repository definition that extends StreamManager. */ -class ContentRepository : public StreamManager<minifi::ResourceClaim>, public utils::EnableSharedFromThis<ContentRepository>, public core::CoreComponent { +class ContentRepository : public core::CoreComponent, public StreamManager<minifi::ResourceClaim>, public utils::EnableSharedFromThis<ContentRepository>, public core::RepositoryMetricsSource { public: explicit ContentRepository(std::string name, const utils::Identifier& uuid = {}) : core::CoreComponent(std::move(name), uuid) {} ~ContentRepository() override = default; - /** - * initialize this content repository using the provided configuration. - */ virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0; std::string getStoragePath() const override; @@ -59,6 +57,10 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut bool remove(const minifi::ResourceClaim &streamId) final; + std::string getRepositoryName() const override { + return getName(); + } + protected: void removeFromPurgeList(); virtual bool removeKey(const std::string& content_path) = 0; diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 77fdfbec3..0c74732bc 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -29,19 +29,20 @@ #include <utility> #include <vector> -#include "Core.h" #include "ResourceClaim.h" #include "core/Connectable.h" #include "core/ContentRepository.h" #include "core/Property.h" #include "core/SerializableComponent.h" #include "core/logging/LoggerFactory.h" +#include "core/RepositoryMetricsSource.h" #include "properties/Configure.h" #include "utils/BackTrace.h" #include "SwapManager.h" #include "utils/Literals.h" #include "utils/StringUtils.h" #include "utils/TimeUtil.h" +#include "core/Core.h" #ifndef WIN32 #include <sys/stat.h> @@ -54,7 +55,7 @@ constexpr auto MAX_REPOSITORY_STORAGE_SIZE = 10_MiB; constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10); constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500); -class Repository : public core::CoreComponent { +class Repository : public core::CoreComponent, public core::RepositoryMetricsSource { public: explicit Repository(std::string repo_name = "Repository", std::string directory = REPOSITORY_DIRECTORY, @@ -62,20 +63,15 @@ class Repository : public core::CoreComponent { int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD) : core::CoreComponent(std::move(repo_name)), - directory_(std::move(directory)), max_partition_millis_(maxPartitionMillis), max_partition_bytes_(maxPartitionBytes), purge_period_(purgePeriod), repo_full_(false), - repo_size_(0), + directory_(std::move(directory)), logger_(logging::LoggerFactory<Repository>::getLogger()) { } - virtual bool isRunning() const { - return true; - } - - virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0; + virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0; virtual bool start() = 0; virtual bool stop() = 0; @@ -116,10 +112,6 @@ class Repository : public core::CoreComponent { return false; } - virtual bool isFull() { - return repo_full_; - } - virtual bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t& /*max_size*/) { return true; } @@ -129,14 +121,14 @@ class Repository : public core::CoreComponent { virtual void loadComponent(const std::shared_ptr<core::ContentRepository>& /*content_repo*/) { } - virtual uint64_t getRepoSize() const { - return repo_size_; - } - std::string getDirectory() const { return directory_; } + std::string getRepositoryName() const override { + return getName(); + } + // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer Repository(const Repository &parent) = delete; @@ -146,15 +138,13 @@ class Repository : public core::CoreComponent { std::map<std::string, core::Connectable*> containers_; std::map<std::string, core::Connectable*> connection_map_; - std::string directory_; // max db entry lifetime std::chrono::milliseconds max_partition_millis_; // max db size int64_t max_partition_bytes_; std::chrono::milliseconds purge_period_; std::atomic<bool> repo_full_; - - std::atomic<uint64_t> repo_size_; + std::string directory_; private: std::shared_ptr<logging::Logger> logger_; diff --git a/libminifi/include/core/RepositoryFactory.h b/libminifi/include/core/RepositoryFactory.h index 60ba5775a..352dec51f 100644 --- a/libminifi/include/core/RepositoryFactory.h +++ b/libminifi/include/core/RepositoryFactory.h @@ -16,8 +16,7 @@ * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ -#define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ +#pragma once #include <memory> #include <string> @@ -45,5 +44,3 @@ std::unique_ptr<core::ContentRepository> createContentRepository(const std::stri std::unique_ptr<core::Repository> createRepository(const std::string& configuration_class_name, const std::string& repo_name = ""); } // namespace org::apache::nifi::minifi::core - -#endif // LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ diff --git a/libminifi/include/core/RepositoryMetricsSource.h b/libminifi/include/core/RepositoryMetricsSource.h new file mode 100644 index 000000000..cfaeb4ae6 --- /dev/null +++ b/libminifi/include/core/RepositoryMetricsSource.h @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> + +namespace org::apache::nifi::minifi::core { + +class RepositoryMetricsSource { + public: + virtual ~RepositoryMetricsSource() = default; + virtual uint64_t getRepositorySize() const = 0; + virtual uint64_t getRepositoryEntryCount() const = 0; + virtual std::string getRepositoryName() const = 0; + + virtual uint64_t getMaxRepositorySize() const { + return 0; + } + + virtual bool isFull() const { + return false; + } + + virtual bool isRunning() const { + return true; + } +}; + +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h index b0c3900cd..99a26b4e3 100644 --- a/libminifi/include/core/repository/FileSystemRepository.h +++ b/libminifi/include/core/repository/FileSystemRepository.h @@ -21,10 +21,12 @@ #include <memory> #include <string> #include <utility> +#include <algorithm> #include "../ContentRepository.h" #include "properties/Configure.h" #include "core/logging/LoggerFactory.h" +#include "utils/file/FileUtils.h" namespace org::apache::nifi::minifi::core::repository { @@ -50,6 +52,18 @@ class FileSystemRepository : public core::ContentRepository { void clearOrphans() override; + uint64_t getRepositorySize() const override { + return utils::file::path_size(directory_); + } + + uint64_t getRepositoryEntryCount() const override { + auto dir_it = std::filesystem::recursive_directory_iterator(directory_, std::filesystem::directory_options::skip_permission_denied); + return std::count_if( + std::filesystem::begin(dir_it), + std::filesystem::end(dir_it), + [](auto& entry) { return entry.is_regular_file(); }); + } + protected: bool removeKey(const std::string& content_path) override; diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h index a5f6027e9..0690255f7 100644 --- a/libminifi/include/core/repository/VolatileContentRepository.h +++ b/libminifi/include/core/repository/VolatileContentRepository.h @@ -24,12 +24,12 @@ #include "AtomicRepoEntries.h" #include "io/AtomicEntryStream.h" #include "../ContentRepository.h" -#include "core/repository/VolatileRepository.h" #include "properties/Configure.h" #include "core/Connectable.h" #include "core/logging/LoggerFactory.h" #include "utils/GeneralUtils.h" #include "VolatileRepositoryData.h" +#include "utils/Literals.h" namespace org::apache::nifi::minifi::core::repository { /** @@ -58,6 +58,22 @@ class VolatileContentRepository : public core::ContentRepository { } } + uint64_t getRepositorySize() const override { + return repo_data_.getRepositorySize(); + } + + uint64_t getMaxRepositorySize() const override { + return repo_data_.getMaxRepositorySize(); + } + + uint64_t getRepositoryEntryCount() const override { + return master_list_.size(); + } + + bool isFull() const override { + return repo_data_.isFull(); + } + /** * Initialize the volatile content repo * @param configure configuration diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index 6277b2a12..f22ace0ad 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -81,8 +81,20 @@ class VolatileRepository : public core::ThreadedRepository { */ bool Get(const std::string& key, std::string &value) override; - uint64_t getRepoSize() const override { - return repo_data_.current_size; + uint64_t getRepositorySize() const override { + return repo_data_.getRepositorySize(); + } + + uint64_t getMaxRepositorySize() const override { + return repo_data_.getMaxRepositorySize(); + } + + uint64_t getRepositoryEntryCount() const override { + return repo_data_.getRepositoryEntryCount(); + } + + bool isFull() const override { + return repo_data_.isFull(); } protected: diff --git a/libminifi/include/core/repository/VolatileRepositoryData.h b/libminifi/include/core/repository/VolatileRepositoryData.h index a428126c5..86d8161b7 100644 --- a/libminifi/include/core/repository/VolatileRepositoryData.h +++ b/libminifi/include/core/repository/VolatileRepositoryData.h @@ -35,8 +35,26 @@ struct VolatileRepositoryData { void initialize(const std::shared_ptr<Configure> &configure, const std::string& repo_name); void clear(); + + uint64_t getRepositorySize() const { + return current_size; + } + + uint64_t getMaxRepositorySize() const { + return max_size; + } + + uint64_t getRepositoryEntryCount() const { + return current_entry_count; + } + + bool isFull() const { + return current_size >= max_size; + } + // current size of the volatile repo. std::atomic<size_t> current_size; + std::atomic<size_t> current_entry_count; // value vector that exists for non blocking iteration over // objects that store data for this repo instance. std::vector<AtomicEntry<std::string>*> value_vector; diff --git a/libminifi/include/core/state/MetricsPublisherStore.h b/libminifi/include/core/state/MetricsPublisherStore.h index 90eab8aa1..b4fafaa92 100644 --- a/libminifi/include/core/state/MetricsPublisherStore.h +++ b/libminifi/include/core/state/MetricsPublisherStore.h @@ -21,6 +21,7 @@ #include <string> #include <utility> #include <memory> +#include <vector> #include "MetricsPublisher.h" #include "core/state/nodes/ResponseNodeLoader.h" @@ -31,8 +32,8 @@ namespace org::apache::nifi::minifi::state { class MetricsPublisherStore { public: - MetricsPublisherStore(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo, - std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::FlowConfiguration> flow_configuration); + MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources, + std::shared_ptr<core::FlowConfiguration> flow_configuration); void initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink); void loadMetricNodes(core::ProcessGroup* root); void clearMetricNodes(); diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h index d38465295..0d18c721e 100644 --- a/libminifi/include/core/state/nodes/AgentInformation.h +++ b/libminifi/include/core/state/nodes/AgentInformation.h @@ -61,6 +61,7 @@ #include "core/AgentIdentificationProvider.h" #include "utils/Export.h" #include "SupportedOperations.h" +#include "core/RepositoryMetricsSource.h" namespace org::apache::nifi::minifi::state::response { @@ -423,13 +424,13 @@ class AgentStatus : public StateMonitorNode { return "AgentStatus"; } - void setRepositories(const std::map<std::string, std::shared_ptr<core::Repository>> &repositories) { + void setRepositories(const std::map<std::string, std::shared_ptr<core::RepositoryMetricsSource>> &repositories) { repositories_ = repositories; } - void addRepository(const std::shared_ptr<core::Repository> &repo) { + void addRepository(const std::shared_ptr<core::RepositoryMetricsSource> &repo) { if (nullptr != repo) { - repositories_.insert(std::make_pair(repo->getName(), repo)); + repositories_.insert(std::make_pair(repo->getRepositoryName(), repo)); } } @@ -454,9 +455,11 @@ class AgentStatus : public StateMonitorNode { std::vector<PublishedMetric> calculateMetrics() override { std::vector<PublishedMetric> metrics; for (const auto& [_, repo] : repositories_) { - metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); - metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); - metrics.push_back({"repository_size", static_cast<double>(repo->getRepoSize()), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); + metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"repository_size_bytes", static_cast<double>(repo->getRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"max_repository_size_bytes", static_cast<double>(repo->getMaxRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"repository_entry_count", static_cast<double>(repo->getRepositoryEntryCount()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); } if (nullptr != monitor_) { auto uptime = monitor_->getUptime(); @@ -488,26 +491,36 @@ class AgentStatus : public StateMonitorNode { repositories.name = "repositories"; for (const auto& repo : repositories_) { - SerializedResponseNode repoNode; - repoNode.collapsible = false; - repoNode.name = repo.first; - - SerializedResponseNode queuesize; - queuesize.name = "size"; - queuesize.value = repo.second->getRepoSize(); - - SerializedResponseNode isRunning; - isRunning.name = "running"; - isRunning.value = repo.second->isRunning(); - - SerializedResponseNode isFull; - isFull.name = "full"; - isFull.value = repo.second->isFull(); - - repoNode.children.push_back(queuesize); - repoNode.children.push_back(isRunning); - repoNode.children.push_back(isFull); - repositories.children.push_back(repoNode); + SerializedResponseNode repo_node; + repo_node.collapsible = false; + repo_node.name = repo.first; + + SerializedResponseNode repo_size; + repo_size.name = "size"; + repo_size.value = repo.second->getRepositorySize(); + + SerializedResponseNode max_repo_size; + max_repo_size.name = "maxSize"; + max_repo_size.value = repo.second->getMaxRepositorySize(); + + SerializedResponseNode repo_entry_count; + repo_entry_count.name = "entryCount"; + repo_entry_count.value = repo.second->getRepositoryEntryCount(); + + SerializedResponseNode is_running; + is_running.name = "running"; + is_running.value = repo.second->isRunning(); + + SerializedResponseNode is_full; + is_full.name = "full"; + is_full.value = repo.second->isFull(); + + repo_node.children.push_back(repo_size); + repo_node.children.push_back(max_repo_size); + repo_node.children.push_back(repo_entry_count); + repo_node.children.push_back(is_running); + repo_node.children.push_back(is_full); + repositories.children.push_back(repo_node); } return repositories; } @@ -578,7 +591,7 @@ class AgentStatus : public StateMonitorNode { return resource_consumption; } - std::map<std::string, std::shared_ptr<core::Repository>> repositories_; + std::map<std::string, std::shared_ptr<core::RepositoryMetricsSource>> repositories_; MINIFIAPI static utils::ProcessCpuUsageTracker cpu_load_tracker_; MINIFIAPI static std::mutex cpu_load_tracker_mutex_; @@ -608,9 +621,9 @@ class AgentMonitor { AgentMonitor() : monitor_(nullptr) { } - void addRepository(const std::shared_ptr<core::Repository> &repo) { + void addRepository(const std::shared_ptr<core::RepositoryMetricsSource> &repo) { if (nullptr != repo) { - repositories_.insert(std::make_pair(repo->getName(), repo)); + repositories_.insert(std::make_pair(repo->getRepositoryName(), repo)); } } @@ -619,7 +632,7 @@ class AgentMonitor { } protected: - std::map<std::string, std::shared_ptr<core::Repository>> repositories_; + std::map<std::string, std::shared_ptr<core::RepositoryMetricsSource>> repositories_; state::StateMonitor* monitor_ = nullptr; }; diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h b/libminifi/include/core/state/nodes/RepositoryMetrics.h index 599730bef..bc8dada72 100644 --- a/libminifi/include/core/state/nodes/RepositoryMetrics.h +++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h @@ -26,6 +26,7 @@ #include "../nodes/MetricsBase.h" #include "Connection.h" +#include "core/RepositoryMetricsSource.h" namespace org::apache::nifi::minifi::state::response { @@ -54,33 +55,42 @@ class RepositoryMetrics : public ResponseNode { return "RepositoryMetrics"; } - void addRepository(const std::shared_ptr<core::Repository> &repo) { + void addRepository(const std::shared_ptr<core::RepositoryMetricsSource> &repo) { if (nullptr != repo) { - repositories_.insert(std::make_pair(repo->getName(), repo)); + repositories_.push_back(repo); } } std::vector<SerializedResponseNode> serialize() override { std::vector<SerializedResponseNode> serialized; - for (auto conn : repositories_) { - auto repo = conn.second; + for (const auto& repo : repositories_) { SerializedResponseNode parent; - parent.name = repo->getName(); - SerializedResponseNode datasize; - datasize.name = "running"; - datasize.value = repo->isRunning(); + parent.name = repo->getRepositoryName(); + SerializedResponseNode is_running; + is_running.name = "running"; + is_running.value = repo->isRunning(); - SerializedResponseNode datasizemax; - datasizemax.name = "full"; - datasizemax.value = repo->isFull(); + SerializedResponseNode is_full; + is_full.name = "full"; + is_full.value = repo->isFull(); - SerializedResponseNode queuesize; - queuesize.name = "size"; - queuesize.value = std::to_string(repo->getRepoSize()); + SerializedResponseNode repo_size; + repo_size.name = "size"; + repo_size.value = std::to_string(repo->getRepositorySize()); - parent.children.push_back(datasize); - parent.children.push_back(datasizemax); - parent.children.push_back(queuesize); + SerializedResponseNode max_repo_size; + max_repo_size.name = "maxSize"; + max_repo_size.value = std::to_string(repo->getMaxRepositorySize()); + + SerializedResponseNode repo_entry_count; + repo_entry_count.name = "entryCount"; + repo_entry_count.value = repo->getRepositoryEntryCount(); + + parent.children.push_back(is_running); + parent.children.push_back(is_full); + parent.children.push_back(repo_size); + parent.children.push_back(max_repo_size); + parent.children.push_back(repo_entry_count); serialized.push_back(parent); } @@ -89,16 +99,18 @@ class RepositoryMetrics : public ResponseNode { std::vector<PublishedMetric> calculateMetrics() override { std::vector<PublishedMetric> metrics; - for (const auto& [_, repo] : repositories_) { - metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); - metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); - metrics.push_back({"repository_size", static_cast<double>(repo->getRepoSize()), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); + for (const auto& repo : repositories_) { + metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"repository_size_bytes", static_cast<double>(repo->getRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"max_repository_size_bytes", static_cast<double>(repo->getMaxRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"repository_entry_count", static_cast<double>(repo->getRepositoryEntryCount()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); } return metrics; } protected: - std::map<std::string, std::shared_ptr<core::Repository>> repositories_; + std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repositories_; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index 719d2e9a5..af330585c 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -33,13 +33,14 @@ #include "utils/gsl.h" #include "utils/Id.h" #include "utils/expected.h" +#include "core/RepositoryMetricsSource.h" namespace org::apache::nifi::minifi::state::response { class ResponseNodeLoader { public: - ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo, - std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::FlowConfiguration> flow_configuration); + ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources, + std::shared_ptr<core::FlowConfiguration> flow_configuration); void setNewConfigRoot(core::ProcessGroup* root); void clearConfigRoot(); @@ -70,8 +71,7 @@ class ResponseNodeLoader { std::unordered_map<std::string, std::vector<SharedResponseNode>> component_metrics_; std::unordered_map<std::string, SharedResponseNode> system_metrics_; std::shared_ptr<Configure> configuration_; - std::shared_ptr<core::Repository> provenance_repo_; - std::shared_ptr<core::Repository> flow_file_repo_; + std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources_; std::shared_ptr<core::FlowConfiguration> flow_configuration_; core::controller::ControllerServiceProvider* controller_{}; state::StateMonitor* update_sink_{}; diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h index db37d861b..ba778f583 100644 --- a/libminifi/include/utils/file/FileUtils.h +++ b/libminifi/include/utils/file/FileUtils.h @@ -206,6 +206,20 @@ inline bool is_directory(const std::filesystem::path &path) { return false; } +inline uint64_t path_size(const std::filesystem::path& path) { + uint64_t size = 0; + if (std::filesystem::is_regular_file(path)) { + return utils::file::file_size(path); + } else if (utils::file::is_directory(path)) { + for (const std::filesystem::directory_entry& entry : std::filesystem::recursive_directory_iterator(path, std::filesystem::directory_options::skip_permission_denied)) { + if (entry.is_regular_file()) { + size += entry.file_size(); + } + } + } + return size; +} + inline bool exists(const std::filesystem::path &path) { std::error_code ec; bool result = std::filesystem::exists(path, ec); diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index 8455043cb..bc79e136d 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -37,6 +37,7 @@ std::unique_ptr<core::ContentRepository> createContentRepository(const std::stri auto return_obj = core::ClassLoader::getDefaultClassLoader().instantiate<core::ContentRepository>(class_name_lc, class_name_lc); if (return_obj) { + return_obj->setName(repo_name); return return_obj; } if (class_name_lc == "volatilecontentrepository") { @@ -68,6 +69,14 @@ class NoOpThreadedRepository : public core::ThreadedRepository { stop(); } + uint64_t getRepositorySize() const override { + return 0; + } + + uint64_t getRepositoryEntryCount() const override { + return 0; + } + private: void run() override { } diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp index 5171376cd..161f9f357 100644 --- a/libminifi/src/core/repository/VolatileRepository.cpp +++ b/libminifi/src/core/repository/VolatileRepository.cpp @@ -71,6 +71,9 @@ bool VolatileRepository::Put(const std::string& key, const uint8_t *buf, size_t } } while (!updated); repo_data_.current_size += size; + if (repo_data_.current_entry_count < repo_data_.max_count) { + ++repo_data_.current_entry_count; + } logger_->log_debug("VolatileRepository -- put %zu %" PRIu32, repo_data_.current_size.load(), current_index_.load()); return true; @@ -92,6 +95,7 @@ bool VolatileRepository::Delete(const std::string& key) { RepoValue<std::string> value; if (ent->getValue(key, value)) { repo_data_.current_size -= value.size(); + --repo_data_.current_entry_count; logger_->log_debug("Delete and pushed into purge_list from volatile"); emplace(value); return true; @@ -102,10 +106,8 @@ bool VolatileRepository::Delete(const std::string& key) { bool VolatileRepository::Get(const std::string &key, std::string &value) { for (auto ent : repo_data_.value_vector) { - // let the destructor do the cleanup RepoValue<std::string> repo_value; if (ent->getValue(key, repo_value)) { - repo_data_.current_size -= value.size(); repo_value.emplace(value); return true; } diff --git a/libminifi/src/core/repository/VolatileRepositoryData.cpp b/libminifi/src/core/repository/VolatileRepositoryData.cpp index a5ab41aff..09633ff40 100644 --- a/libminifi/src/core/repository/VolatileRepositoryData.cpp +++ b/libminifi/src/core/repository/VolatileRepositoryData.cpp @@ -23,6 +23,7 @@ namespace org::apache::nifi::minifi::core::repository { VolatileRepositoryData::VolatileRepositoryData(uint32_t max_count, size_t max_size) : current_size(0), + current_entry_count(0), max_count(max_count), max_size(max_size) { } diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp b/libminifi/src/core/state/MetricsPublisherStore.cpp index d4622b8ea..c837d264a 100644 --- a/libminifi/src/core/state/MetricsPublisherStore.cpp +++ b/libminifi/src/core/state/MetricsPublisherStore.cpp @@ -22,10 +22,10 @@ namespace org::apache::nifi::minifi::state { -MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo, - std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::FlowConfiguration> flow_configuration) +MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources, + std::shared_ptr<core::FlowConfiguration> flow_configuration) : configuration_(configuration), - response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration), std::move(provenance_repo), std::move(flow_file_repo), std::move(flow_configuration))) { + response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration), repository_metric_sources, std::move(flow_configuration))) { } void MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink) { diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index aeed88e99..66fe7f213 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -32,12 +32,11 @@ namespace org::apache::nifi::minifi::state::response { -ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo, - std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::FlowConfiguration> flow_configuration) - : configuration_(std::move(configuration)), - provenance_repo_(std::move(provenance_repo)), - flow_file_repo_(std::move(flow_file_repo)), - flow_configuration_(std::move(flow_configuration)) { +ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources, + std::shared_ptr<core::FlowConfiguration> flow_configuration) + : configuration_(std::move(configuration)), + repository_metric_sources_(std::move(repository_metric_sources)), + flow_configuration_(std::move(flow_configuration)) { } void ResponseNodeLoader::clearConfigRoot() { @@ -124,8 +123,9 @@ std::vector<SharedResponseNode> ResponseNodeLoader::getResponseNodes(const std:: void ResponseNodeLoader::initializeRepositoryMetrics(const SharedResponseNode& response_node) const { auto repository_metrics = dynamic_cast<RepositoryMetrics*>(response_node.get()); if (repository_metrics != nullptr) { - repository_metrics->addRepository(provenance_repo_); - repository_metrics->addRepository(flow_file_repo_); + for (const auto& repo : repository_metric_sources_) { + repository_metrics->addRepository(repo); + } } } @@ -155,8 +155,9 @@ void ResponseNodeLoader::initializeAgentIdentifier(const SharedResponseNode& res void ResponseNodeLoader::initializeAgentMonitor(const SharedResponseNode& response_node) const { auto monitor = dynamic_cast<state::response::AgentMonitor*>(response_node.get()); if (monitor != nullptr) { - monitor->addRepository(provenance_repo_); - monitor->addRepository(flow_file_repo_); + for (const auto& repo : repository_metric_sources_) { + monitor->addRepository(repo); + } monitor->setStateMonitor(update_sink_); } } @@ -176,8 +177,9 @@ void ResponseNodeLoader::initializeAgentNode(const SharedResponseNode& response_ void ResponseNodeLoader::initializeAgentStatus(const SharedResponseNode& response_node) const { auto agent_status = dynamic_cast<state::response::AgentStatus*>(response_node.get()); if (agent_status != nullptr) { - agent_status->addRepository(provenance_repo_); - agent_status->addRepository(flow_file_repo_); + for (const auto& repo : repository_metric_sources_) { + agent_status->addRepository(repo); + } agent_status->setStateMonitor(update_sink_); } } diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h index ced53fea9..760d903e3 100644 --- a/libminifi/test/flow-tests/TestControllerWithFlow.h +++ b/libminifi/test/flow-tests/TestControllerWithFlow.h @@ -21,6 +21,7 @@ #include <string> #include <memory> #include <utility> +#include <vector> #include "FlowController.h" #include "unit/ProvenanceTestHelper.h" @@ -68,7 +69,8 @@ class TestControllerWithFlow: public TestController { auto flow = std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{ff_repo, content_repo, stream_factory, configuration_, yaml_path_.string()}); auto root = flow->getRoot(); root_ = root.get(); - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration_, prov_repo, ff_repo, flow); + std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{prov_repo, ff_repo, content_repo}; + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration_, repo_metric_sources, flow); metrics_publisher_store_ = metrics_publisher_store.get(); controller_ = std::make_shared<minifi::FlowController>(prov_repo, ff_repo, configuration_, std::move(flow), content_repo, std::move(metrics_publisher_store)); controller_->load(std::move(root)); diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h index 84bfedc7e..00631ffdf 100644 --- a/libminifi/test/integration/IntegrationBase.h +++ b/libminifi/test/integration/IntegrationBase.h @@ -24,6 +24,8 @@ #include <optional> #include <string> #include <utility> +#include <vector> + #include "core/logging/Logger.h" #include "core/ProcessGroup.h" #include "core/yaml/YamlConfiguration.h" @@ -195,7 +197,8 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ running = true; }; - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, test_repo, test_flow_repo, flow_config); + std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{test_repo, test_flow_repo, content_repo}; + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, flow_config); flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(flow_config), content_repo, std::move(metrics_publisher_store), filesystem, request_restart); flowController_->load(); diff --git a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp index db437fb61..ef061d290 100644 --- a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp @@ -52,7 +52,7 @@ void verifyMaxKeyCount(const minifi::provenance::ProvenanceRepository& repo, uin for (int i = 0; i < 50; ++i) { std::this_thread::sleep_for(100ms); - k = std::min(k, repo.getKeyCount()); + k = std::min(k, repo.getRepositoryEntryCount()); if (k < keyCount) { break; } diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp index 5d921fff0..4a187cca6 100644 --- a/libminifi/test/rocksdb-tests/RepoTests.cpp +++ b/libminifi/test/rocksdb-tests/RepoTests.cpp @@ -27,6 +27,7 @@ #include "core/RepositoryFactory.h" #include "FlowFileRecord.h" #include "FlowFileRepository.h" +#include "ProvenanceRepository.h" #include "provenance/Provenance.h" #include "properties/Configure.h" #include "../unit/ProvenanceTestHelper.h" @@ -34,6 +35,9 @@ #include "../Catch.h" #include "utils/gsl.h" #include "utils/IntegrationTestUtils.h" +#include "core/repository/VolatileFlowFileRepository.h" +#include "core/repository/VolatileProvenanceRepository.h" +#include "DatabaseContentRepository.h" using namespace std::literals::chrono_literals; @@ -515,4 +519,221 @@ TEST_CASE("FlowFileRepository synchronously pushes existing flow files") { } } +TEST_CASE("Test getting flow file repository size properties", "[TestGettingRepositorySize]") { + LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); + LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceRepository>(); + LogTestController::getInstance().setDebug<core::repository::VolatileFlowFileRepository>(); + LogTestController::getInstance().setDebug<core::repository::VolatileProvenanceRepository>(); + TestController testController; + auto dir = testController.createTempDirectory(); + + std::shared_ptr<core::Repository> repository; + auto expected_is_full = false; + uint64_t expected_max_repo_size = 0; + SECTION("FlowFileRepository") { + repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + } + + SECTION("ProvenanceRepository") { + repository = std::make_shared<minifi::provenance::ProvenanceRepository>("ff", dir.string(), 0ms, 0, 1ms); + } + + SECTION("VolatileFlowFileRepository") { + repository = std::make_shared<core::repository::VolatileFlowFileRepository>("ff", dir.string(), 0ms, 10, 1ms); + expected_is_full = true; + expected_max_repo_size = 7; + } + + SECTION("VolatileProvenanceRepository") { + repository = std::make_shared<core::repository::VolatileProvenanceRepository>("ff", dir.string(), 0ms, 10, 1ms); + expected_is_full = true; + expected_max_repo_size = 7; + } + auto configuration = std::make_shared<minifi::Configure>(); + repository->initialize(configuration); + + auto flow_file = std::make_shared<minifi::FlowFileRecord>(); + + for (auto i = 0; i < 100; ++i) { + flow_file->addAttribute("key" + std::to_string(i), "testattributevalue" + std::to_string(i)); + } + + auto original_size = repository->getRepositorySize(); + using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; + REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(5), [&original_size, &repository] { + auto old_size = original_size; + original_size = repository->getRepositorySize(); + return old_size == original_size; + }, + std::chrono::milliseconds(50))); + REQUIRE(true == flow_file->Persist(repository)); + auto flow_file_2 = std::make_shared<minifi::FlowFileRecord>(); + REQUIRE(true == flow_file_2->Persist(repository)); + + repository->flush(); + repository->stop(); + + auto new_size = repository->getRepositorySize(); + REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(5), [&new_size, &repository] { + auto old_size = new_size; + new_size = repository->getRepositorySize(); + return old_size == new_size; + }, + std::chrono::milliseconds(50))); + REQUIRE(new_size > original_size); + REQUIRE(expected_is_full == repository->isFull()); + REQUIRE(expected_max_repo_size == repository->getMaxRepositorySize()); + REQUIRE(2 == repository->getRepositoryEntryCount()); +} + +TEST_CASE("Test getting noop repository size properties", "[TestGettingRepositorySize]") { + TestController testController; + auto dir = testController.createTempDirectory(); + + auto repository = minifi::core::createRepository("NoOpRepository", "ff"); + + repository->initialize(std::make_shared<minifi::Configure>()); + + auto flow_file = std::make_shared<minifi::FlowFileRecord>(); + + flow_file->addAttribute("key", "testattributevalue"); + + repository->flush(); + repository->stop(); + + REQUIRE(repository->getRepositorySize() == 0); + REQUIRE(!repository->isFull()); + REQUIRE(repository->getMaxRepositorySize() == 0); + REQUIRE(repository->getRepositoryEntryCount() == 0); +} + +TEST_CASE("Test getting content repository size properties", "[TestGettingRepositorySize]") { + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>(); + TestController testController; + auto dir = testController.createTempDirectory(); + + auto repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + + auto content_repo_dir = testController.createTempDirectory(); + auto configuration = std::make_shared<minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_dir.string()); + std::string content = "content"; + configuration->set(minifi::Configure::nifi_volatile_repository_options_content_max_bytes, std::to_string(content.size())); + + std::shared_ptr<core::ContentRepository> content_repo; + auto expected_is_full = false; + uint64_t expected_max_repo_size = 0; + SECTION("FileSystemRepository") { + content_repo = std::make_shared<core::repository::FileSystemRepository>(); + } + + SECTION("VolatileContentRepository") { + content_repo = std::make_shared<core::repository::VolatileContentRepository>("content"); + expected_is_full = true; + expected_max_repo_size = content.size(); + } + + SECTION("DatabaseContentRepository") { + content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + } + + content_repo->initialize(configuration); + + repository->initialize(configuration); + repository->loadComponent(content_repo); + auto original_content_repo_size = content_repo->getRepositorySize(); + + auto flow_file = std::make_shared<minifi::FlowFileRecord>(); + + auto content_session = content_repo->createSession(); + auto claim = content_session->create(); + auto stream = content_session->write(claim); + stream->write(gsl::make_span(content).as_span<const std::byte>()); + flow_file->setResourceClaim(claim); + flow_file->setSize(stream->size()); + flow_file->setOffset(0); + + stream->close(); + content_session->commit(); + + repository->flush(); + repository->stop(); + + using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; + REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(5), [&original_content_repo_size, &content_repo] { + auto new_content_repo_size = content_repo->getRepositorySize(); + return new_content_repo_size > original_content_repo_size; + }, + std::chrono::milliseconds(50))); + + REQUIRE(expected_is_full == content_repo->isFull()); + REQUIRE(expected_max_repo_size == content_repo->getMaxRepositorySize()); + REQUIRE(1 == content_repo->getRepositoryEntryCount()); +} + +TEST_CASE("Flow file repositories can be stopped", "[TestRepoIsRunning]") { + LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); + LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceRepository>(); + LogTestController::getInstance().setDebug<core::repository::VolatileFlowFileRepository>(); + LogTestController::getInstance().setDebug<core::repository::VolatileProvenanceRepository>(); + TestController testController; + auto dir = testController.createTempDirectory(); + + std::shared_ptr<core::Repository> repository; + SECTION("FlowFileRepository") { + repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + } + + SECTION("ProvenanceRepository") { + repository = std::make_shared<minifi::provenance::ProvenanceRepository>("ff", dir.string(), 0ms, 0, 1ms); + } + + SECTION("VolatileFlowFileRepository") { + repository = std::make_shared<core::repository::VolatileFlowFileRepository>("ff", dir.string(), 0ms, 10, 1ms); + } + + SECTION("VolatileProvenanceRepository") { + repository = std::make_shared<core::repository::VolatileProvenanceRepository>("ff", dir.string(), 0ms, 10, 1ms); + } + + SECTION("NoOpRepository") { + repository = minifi::core::createRepository("NoOpRepository", "ff"); + } + + repository->initialize(std::make_shared<minifi::Configure>()); + + REQUIRE(!repository->isRunning()); + repository->start(); + REQUIRE(repository->isRunning()); + repository->stop(); + REQUIRE(!repository->isRunning()); +} + +TEST_CASE("Content repositories are always running", "[TestRepoIsRunning]") { + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>(); + TestController testController; + auto dir = testController.createTempDirectory(); + + std::shared_ptr<core::ContentRepository> content_repo; + SECTION("FileSystemRepository") { + content_repo = std::make_shared<core::repository::FileSystemRepository>(); + } + + SECTION("VolatileContentRepository") { + content_repo = std::make_shared<core::repository::VolatileContentRepository>("content"); + } + + SECTION("DatabaseContentRepository") { + content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + } + + REQUIRE(content_repo->isRunning()); +} + } // namespace diff --git a/libminifi/test/unit/ControllerSocketMetricsPublisherTest.cpp b/libminifi/test/unit/ControllerSocketMetricsPublisherTest.cpp index 71a336ac2..8db4e2f00 100644 --- a/libminifi/test/unit/ControllerSocketMetricsPublisherTest.cpp +++ b/libminifi/test/unit/ControllerSocketMetricsPublisherTest.cpp @@ -63,7 +63,7 @@ class ControllerSocketMetricsPublisherTestFixture { public: ControllerSocketMetricsPublisherTestFixture() : configuration_(std::make_shared<Configure>()), - response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_, nullptr, nullptr, nullptr)), + response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr)), test_response_node_(std::make_shared<TestQueueMetrics>()), controller_socket_metrics_publisher_("test_publisher") { controller_socket_metrics_publisher_.initialize(configuration_, response_node_loader_); diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp index fc87866b7..d537f6f27 100644 --- a/libminifi/test/unit/FileUtilsTests.cpp +++ b/libminifi/test/unit/FileUtilsTests.cpp @@ -477,3 +477,42 @@ TEST_CASE("FileUtils::get_relative_path", "[TestGetRelativePath]") { REQUIRE(*FileUtils::get_relative_path(path, base_path / "") == std::filesystem::path("subdir") / "file.log"); REQUIRE(*FileUtils::get_relative_path(base_path, base_path) == "."); } + +TEST_CASE("FileUtils::path_size", "[TestPathSize]") { + auto writeToFile = [](const std::filesystem::path& path) { + std::ofstream test_file_stream(path, std::ios::out | std::ios::binary); + test_file_stream << "foo\n"; + test_file_stream.flush(); + }; + + TestController test_controller; + REQUIRE(FileUtils::path_size({""}) == 0); + REQUIRE(FileUtils::path_size({"/random/non-existent/dir"}) == 0); + + auto dir = test_controller.createTempDirectory(); + REQUIRE(FileUtils::path_size(dir) == 0); + + auto test_file = dir / "test_file.log"; + writeToFile(test_file); + + REQUIRE(FileUtils::path_size(test_file) == 4); + REQUIRE(FileUtils::path_size(dir) == 4); + + auto subdir = dir / "subdir"; + REQUIRE(utils::file::create_dir(subdir) == 0); + + REQUIRE(FileUtils::path_size(dir) == 4); + + auto subdir_test_file = subdir / "test_file2.log"; + writeToFile(subdir_test_file); + + REQUIRE(FileUtils::path_size(dir) == 8); + + auto subsubdir = subdir / "subsubdir"; + REQUIRE(utils::file::create_dir(subsubdir) == 0); + + auto subsubdir_test_file = subsubdir / "test_file3.log"; + writeToFile(subsubdir_test_file); + + REQUIRE(FileUtils::path_size(dir) == 12); +} diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index 972365fd4..af6d9e963 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -26,16 +26,22 @@ #include "repository/VolatileContentRepository.h" #include "ProvenanceTestHelper.h" #include "../DummyProcessor.h" +#include "range/v3/algorithm/find_if.hpp" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::test { +void checkSerializedValue(const std::vector<org::apache::nifi::minifi::state::response::SerializedResponseNode>& children, const std::string& name, const std::string& expected_value) { + auto it = ranges::find_if(children, [&](const auto& child) { return child.name == name; }); + REQUIRE(it != children.end()); + REQUIRE(expected_value == it->value.to_string()); +} + TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") { minifi::state::response::QueueMetrics metrics; REQUIRE("QueueMetrics" == metrics.getName()); - REQUIRE(metrics.serialize().empty()); } @@ -58,40 +64,24 @@ TEST_CASE("QueueMetricsTestConnections", "[c2m3]") { metrics.updateConnection(connection.get()); - REQUIRE(1 == metrics.serialize().size()); + auto seialized_metrics = metrics.serialize(); + REQUIRE(1 == seialized_metrics.size()); minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("testconnection" == resp.name); - REQUIRE(4 == resp.children.size()); - minifi::state::response::SerializedResponseNode datasize = resp.children.at(0); - - REQUIRE("datasize" == datasize.name); - REQUIRE("0" == datasize.value.to_string()); - - minifi::state::response::SerializedResponseNode datasizemax = resp.children.at(1); - - REQUIRE("datasizemax" == datasizemax.name); - REQUIRE("1024" == datasizemax.value); - - minifi::state::response::SerializedResponseNode queued = resp.children.at(2); - - REQUIRE("queued" == queued.name); - REQUIRE("0" == queued.value.to_string()); - - minifi::state::response::SerializedResponseNode queuedmax = resp.children.at(3); - - REQUIRE("queuedmax" == queuedmax.name); - REQUIRE("1024" == queuedmax.value.to_string()); + checkSerializedValue(resp.children, "datasize", "0"); + checkSerializedValue(resp.children, "datasizemax", "1024"); + checkSerializedValue(resp.children, "queued", "0"); + checkSerializedValue(resp.children, "queuedmax", "1024"); } TEST_CASE("RepositorymetricsNoRepo", "[c2m4]") { minifi::state::response::RepositoryMetrics metrics; REQUIRE("RepositoryMetrics" == metrics.getName()); - REQUIRE(metrics.serialize().empty()); } @@ -109,23 +99,13 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); + REQUIRE(5 == resp.children.size()); - REQUIRE(3 == resp.children.size()); - - minifi::state::response::SerializedResponseNode running = resp.children.at(0); - - REQUIRE("running" == running.name); - REQUIRE("false" == running.value.to_string()); - - minifi::state::response::SerializedResponseNode full = resp.children.at(1); - - REQUIRE("full" == full.name); - REQUIRE("false" == full.value); - - minifi::state::response::SerializedResponseNode size = resp.children.at(2); - - REQUIRE("size" == size.name); - REQUIRE("0" == size.value); + checkSerializedValue(resp.children, "running", "false"); + checkSerializedValue(resp.children, "full", "false"); + checkSerializedValue(resp.children, "size", "0"); + checkSerializedValue(resp.children, "maxSize", "0"); + checkSerializedValue(resp.children, "entryCount", "0"); } repo->start(); @@ -135,26 +115,16 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); + REQUIRE(5 == resp.children.size()); - REQUIRE(3 == resp.children.size()); - - minifi::state::response::SerializedResponseNode running = resp.children.at(0); - - REQUIRE("running" == running.name); - REQUIRE("true" == running.value.to_string()); - - minifi::state::response::SerializedResponseNode full = resp.children.at(1); - - REQUIRE("full" == full.name); - REQUIRE("false" == full.value); - - minifi::state::response::SerializedResponseNode size = resp.children.at(2); - - REQUIRE("size" == size.name); - REQUIRE("0" == size.value); + checkSerializedValue(resp.children, "running", "true"); + checkSerializedValue(resp.children, "full", "false"); + checkSerializedValue(resp.children, "size", "0"); + checkSerializedValue(resp.children, "maxSize", "0"); + checkSerializedValue(resp.children, "entryCount", "0"); } - repo->setFull(); + repo->stop(); { REQUIRE(1 == metrics.serialize().size()); @@ -162,50 +132,54 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); + REQUIRE(5 == resp.children.size()); - REQUIRE(3 == resp.children.size()); - - minifi::state::response::SerializedResponseNode running = resp.children.at(0); - - REQUIRE("running" == running.name); - REQUIRE("true" == running.value.to_string()); - - minifi::state::response::SerializedResponseNode full = resp.children.at(1); - - REQUIRE("full" == full.name); - REQUIRE("true" == full.value.to_string()); + checkSerializedValue(resp.children, "running", "false"); + checkSerializedValue(resp.children, "full", "false"); + checkSerializedValue(resp.children, "size", "0"); + checkSerializedValue(resp.children, "maxSize", "0"); + checkSerializedValue(resp.children, "entryCount", "0"); + } +} - minifi::state::response::SerializedResponseNode size = resp.children.at(2); +TEST_CASE("VolatileRepositorymetricsCanBeFull", "[c2m4]") { + minifi::state::response::RepositoryMetrics metrics; - REQUIRE("size" == size.name); - REQUIRE("0" == size.value.to_string()); - } + REQUIRE("RepositoryMetrics" == metrics.getName()); - repo->stop(); + auto repo = std::make_shared<TestVolatileRepository>(); + metrics.addRepository(repo); { REQUIRE(1 == metrics.serialize().size()); minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); + REQUIRE(5 == resp.children.size()); - REQUIRE(3 == resp.children.size()); - - minifi::state::response::SerializedResponseNode running = resp.children.at(0); + checkSerializedValue(resp.children, "running", "false"); + checkSerializedValue(resp.children, "full", "false"); + checkSerializedValue(resp.children, "size", "0"); + checkSerializedValue(resp.children, "maxSize", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75))); + checkSerializedValue(resp.children, "entryCount", "0"); + } - REQUIRE("running" == running.name); - REQUIRE("false" == running.value.to_string()); + repo->setFull(); - minifi::state::response::SerializedResponseNode full = resp.children.at(1); + { + REQUIRE(1 == metrics.serialize().size()); - REQUIRE("full" == full.name); - REQUIRE("true" == full.value); + minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); - minifi::state::response::SerializedResponseNode size = resp.children.at(2); + REQUIRE("repo_name" == resp.name); + REQUIRE(5 == resp.children.size()); - REQUIRE("size" == size.name); - REQUIRE("0" == size.value); + checkSerializedValue(resp.children, "running", "false"); + checkSerializedValue(resp.children, "full", "true"); + checkSerializedValue(resp.children, "size", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75))); + checkSerializedValue(resp.children, "maxSize", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75))); + checkSerializedValue(resp.children, "entryCount", "10000"); } } diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index f2d04ed70..3f5a16c72 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -28,6 +28,7 @@ #include <utility> #include <vector> #include "core/repository/VolatileContentRepository.h" +#include "core/repository/VolatileFlowFileRepository.h" #include "core/Processor.h" #include "core/ThreadedRepository.h" #include "Connection.h" @@ -39,21 +40,19 @@ using namespace std::literals::chrono_literals; +const int64_t TEST_MAX_REPOSITORY_STORAGE_SIZE = 100; + template <typename T_BaseRepository> class TestRepositoryBase : public T_BaseRepository { public: TestRepositoryBase() - : T_BaseRepository("repo_name", "./dir", 1s, 100, 0ms) { + : T_BaseRepository("repo_name", "./dir", 1s, TEST_MAX_REPOSITORY_STORAGE_SIZE, 0ms) { } bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &) override { return true; } - void setFull() { - T_BaseRepository::repo_full_ = true; - } - bool isNoop() const override { return false; } @@ -125,6 +124,30 @@ class TestRepository : public TestRepositoryBase<org::apache::nifi::minifi::core bool stop() override { return true; } + + uint64_t getRepositorySize() const override { + return 0; + } + + uint64_t getRepositoryEntryCount() const override { + return 0; + } +}; + +class TestVolatileRepository : public TestRepositoryBase<org::apache::nifi::minifi::core::repository::VolatileFlowFileRepository> { + public: + bool start() override { + return true; + } + + bool stop() override { + return true; + } + + void setFull() { + repo_data_.current_size = repo_data_.max_size; + repo_data_.current_entry_count = repo_data_.max_count; + } }; class TestThreadedRepository : public TestRepositoryBase<org::apache::nifi::minifi::core::ThreadedRepository> { @@ -142,13 +165,21 @@ class TestThreadedRepository : public TestRepositoryBase<org::apache::nifi::mini return thread_; } + uint64_t getRepositorySize() const override { + return 0; + } + + uint64_t getRepositoryEntryCount() const override { + return 0; + } + std::thread thread_; }; class TestFlowRepository : public org::apache::nifi::minifi::core::ThreadedRepository { public: TestFlowRepository() - : org::apache::nifi::minifi::core::ThreadedRepository("ff", "./dir", 1s, 100, 0ms) { + : org::apache::nifi::minifi::core::ThreadedRepository("ff", "./dir", 1s, TEST_MAX_REPOSITORY_STORAGE_SIZE, 0ms) { } bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &) override { @@ -178,6 +209,14 @@ class TestFlowRepository : public org::apache::nifi::minifi::core::ThreadedRepos } } + uint64_t getRepositorySize() const override { + return 0; + } + + uint64_t getRepositoryEntryCount() const override { + return 0; + } + private: void run() override { } diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp b/libminifi/test/unit/ResponseNodeLoaderTests.cpp index 1c5d41ea1..f0926523e 100644 --- a/libminifi/test/unit/ResponseNodeLoaderTests.cpp +++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp @@ -36,7 +36,7 @@ class ResponseNodeLoaderTestFixture { prov_repo_(std::make_shared<TestRepository>()), ff_repository_(std::make_shared<TestRepository>()), content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()), - response_node_loader_(configuration_, prov_repo_, ff_repository_, nullptr) { + response_node_loader_(configuration_, {prov_repo_, ff_repository_, content_repo_}, nullptr) { ff_repository_->initialize(configuration_); content_repo_->initialize(configuration_); auto uuid1 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1"); diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index 29476495c..1c97cb757 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -371,7 +371,8 @@ int main(int argc, char **argv) { .path = configure->get(minifi::Configure::nifi_flow_configuration_file), .filesystem = filesystem}, nifi_configuration_class_name); - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, prov_repo, flow_repo, flow_configuration); + std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{prov_repo, flow_repo, content_repo}; + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, repo_metric_sources, flow_configuration); const auto controller = std::make_unique<minifi::FlowController>( prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, std::move(metrics_publisher_store), filesystem, request_restart);