This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 93ef73a00f8775f0bdcd63554ab10fdabb49652f
Author: Gabor Gyimesi <gamezb...@gmail.com>
AuthorDate: Mon Jan 2 15:08:34 2023 +0100

    MINIFICPP-2022 Add valid repository size metrics for all repositories
    
    Signed-off-by: Ferenc Gerlits <fgerl...@gmail.com>
    This closes #1490
---
 METRICS.md                                         |  40 ++--
 controller/tests/ControllerTests.cpp               |   4 +-
 .../cluster/checkers/PrometheusChecker.py          |  19 +-
 extensions/coap/tests/CoapIntegrationBase.h        |   4 +-
 extensions/http-curl/tests/C2PauseResumeTest.cpp   |   3 +-
 .../tests/PrometheusMetricsPublisherTest.cpp       |   9 +-
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  19 ++
 .../rocksdb-repos/DatabaseContentRepository.h      |   3 +
 extensions/rocksdb-repos/FlowFileRepository.cpp    |  73 +------
 extensions/rocksdb-repos/FlowFileRepository.h      |  33 +--
 extensions/rocksdb-repos/ProvenanceRepository.cpp  |  94 +++------
 extensions/rocksdb-repos/ProvenanceRepository.h    |  25 +--
 extensions/rocksdb-repos/RocksDbRepository.cpp     | 112 +++++++++++
 extensions/rocksdb-repos/RocksDbRepository.h       |  63 ++++++
 extensions/rocksdb-repos/database/OpenRocksDb.cpp  |  13 ++
 extensions/rocksdb-repos/database/OpenRocksDb.h    |   3 +
 libminifi/include/core/ContentRepository.h         |  12 +-
 libminifi/include/core/Repository.h                |  30 +--
 libminifi/include/core/RepositoryFactory.h         |   5 +-
 libminifi/include/core/RepositoryMetricsSource.h   |  45 +++++
 .../include/core/repository/FileSystemRepository.h |  14 ++
 .../core/repository/VolatileContentRepository.h    |  18 +-
 .../include/core/repository/VolatileRepository.h   |  16 +-
 .../core/repository/VolatileRepositoryData.h       |  18 ++
 .../include/core/state/MetricsPublisherStore.h     |   5 +-
 .../include/core/state/nodes/AgentInformation.h    |  73 ++++---
 .../include/core/state/nodes/RepositoryMetrics.h   |  56 ++++--
 .../include/core/state/nodes/ResponseNodeLoader.h  |   8 +-
 libminifi/include/utils/file/FileUtils.h           |  14 ++
 libminifi/src/core/RepositoryFactory.cpp           |   9 +
 .../src/core/repository/VolatileRepository.cpp     |   6 +-
 .../src/core/repository/VolatileRepositoryData.cpp |   1 +
 libminifi/src/core/state/MetricsPublisherStore.cpp |   6 +-
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  26 +--
 libminifi/test/flow-tests/TestControllerWithFlow.h |   4 +-
 libminifi/test/integration/IntegrationBase.h       |   5 +-
 .../rocksdb-tests/DBProvenanceRepositoryTests.cpp  |   2 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         | 221 +++++++++++++++++++++
 .../unit/ControllerSocketMetricsPublisherTest.cpp  |   2 +-
 libminifi/test/unit/FileUtilsTests.cpp             |  39 ++++
 libminifi/test/unit/MetricsTests.cpp               | 140 ++++++-------
 libminifi/test/unit/ProvenanceTestHelper.h         |  51 ++++-
 libminifi/test/unit/ResponseNodeLoaderTests.cpp    |   2 +-
 minifi_main/MiNiFiMain.cpp                         |   3 +-
 44 files changed, 931 insertions(+), 417 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index f6c87dbaf..f0cc52d54 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -100,17 +100,19 @@ QueueMetrics is a system level metric that reports queue 
metrics for every conne
 
 ### RepositoryMetrics
 
-RepositoryMetrics is a system level metric that reports metrics for the 
registered repositories (by default flowfile and provenance repository)
+RepositoryMetrics is a system level metric that reports metrics for the 
registered repositories (by default flowfile, content, and provenance 
repositories)
 
-| Metric name          | Labels          | Description                         
  |
-|----------------------|-----------------|---------------------------------------|
-| is_running           | repository_name | Is the repository running (1 or 0)  
  |
-| is_full              | repository_name | Is the repository full (1 or 0)     
  |
-| repository_size      | repository_name | Current size of the repository      
  |
+| Metric name               | Labels          | Description                    
                 |
+|---------------------------|-----------------|-------------------------------------------------|
+| is_running                | repository_name | Is the repository running (1 
or 0)              |
+| is_full                   | repository_name | Is the repository full (1 or 
0)                 |
+| repository_size_bytes     | repository_name | Current size of the repository 
                 |
+| max_repository_size_bytes | repository_name | Maximum size of the repository 
(0 if unlimited) |
+| repository_entry_count    | repository_name | Current number of entries in 
the repository     |
 
-| Label                    | Description                                       
              |
-|--------------------------|-----------------------------------------------------------------|
-| repository_name          | Name of the reported repository                   
              |
+| Label                    | Description                                       
                                                                                
    |
+|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
+| repository_name          | Name of the reported repository. There are three 
repositories present with the following names: `flowfile`, `content` and 
`provenance` |
 
 ### DeviceInfoNode
 
@@ -145,15 +147,17 @@ FlowInformation is a system level metric that reports 
component and queue relate
 
 AgentStatus is a system level metric that defines current agent status 
including repository, component and resource usage information.
 
-| Metric name              | Labels                         | Description      
                                                                                
          |
-|--------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------|
-| is_running               | repository_name                | Is the 
repository running (1 or 0)                                                     
                    |
-| is_full                  | repository_name                | Is the 
repository full (1 or 0)                                                        
                    |
-| repository_size          | repository_name                | Current size of 
the repository                                                                  
           |
-| uptime_milliseconds      | -                              | Agent uptime in 
milliseconds                                                                    
           |
-| is_running               | component_uuid, component_name | Check if the 
component is running (1 or 0)                                                   
              |
-| agent_memory_usage_bytes | -                              | Memory used by 
the agent process in bytes                                                      
            |
-| agent_cpu_utilization    | -                              | CPU utilization 
of the agent process (between 0 and 1). In case of a query error the returned 
value is -1. |
+| Metric name               | Labels                         | Description     
                                                                                
           |
+|---------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------|
+| is_running                | repository_name                | Is the 
repository running (1 or 0)                                                     
                    |
+| is_full                   | repository_name                | Is the 
repository full (1 or 0)                                                        
                    |
+| repository_size_bytes     | repository_name                | Current size of 
the repository                                                                  
           |
+| max_repository_size_bytes | repository_name                | Maximum size of 
the repository (0 if unlimited)                                                 
           |
+| repository_entry_count    | repository_name                | Current number 
of entries in the repository                                                    
            |
+| uptime_milliseconds       | -                              | Agent uptime in 
milliseconds                                                                    
           |
+| is_running                | component_uuid, component_name | Check if the 
component is running (1 or 0)                                                   
              |
+| agent_memory_usage_bytes  | -                              | Memory used by 
the agent process in bytes                                                      
            |
+| agent_cpu_utilization     | -                              | CPU utilization 
of the agent process (between 0 and 1). In case of a query error the returned 
value is -1. |
 
 | Label           | Description                                              |
 |-----------------|----------------------------------------------------------|
diff --git a/controller/tests/ControllerTests.cpp 
b/controller/tests/ControllerTests.cpp
index 49ad26f79..c88af54df 100644
--- a/controller/tests/ControllerTests.cpp
+++ b/controller/tests/ControllerTests.cpp
@@ -514,7 +514,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test manifest 
getter", "[controllerTest
   }
 
   auto reporter = 
std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
-  auto response_node_loader = 
std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, 
nullptr, nullptr, nullptr);
+  auto response_node_loader = 
std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
   reporter->initialize(configuration_, response_node_loader);
   initalizeControllerSocket(reporter);
 
@@ -538,7 +538,7 @@ TEST_CASE_METHOD(ControllerTestFixture, "Test jstack 
getter", "[controllerTests]
   }
 
   auto reporter = 
std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
-  auto response_node_loader = 
std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, 
nullptr, nullptr, nullptr);
+  auto response_node_loader = 
std::make_shared<minifi::state::response::ResponseNodeLoader>(configuration_, 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
   reporter->initialize(configuration_, response_node_loader);
   initalizeControllerSocket(reporter);
 
diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py 
b/docker/test/integration/cluster/checkers/PrometheusChecker.py
index 461e9a5e8..bd7daff8b 100644
--- a/docker/test/integration/cluster/checkers/PrometheusChecker.py
+++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py
@@ -47,8 +47,9 @@ class PrometheusChecker:
             raise Exception("Metric class '%s' verification is not 
implemented" % metric_class)
 
     def verify_repository_metrics(self):
-        label_list = [{'repository_name': 'provenance'}, {'repository_name': 
'flowfile'}]
-        return all((self.verify_metrics_exist(['minifi_is_running', 
'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels) for 
labels in label_list))
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 
'flowfile'}, {'repository_name': 'content'}]
+        return all((self.verify_metrics_exist(['minifi_is_running', 
'minifi_is_full', 'minifi_repository_size_bytes', 
'minifi_max_repository_size_bytes', 'minifi_repository_entry_count'], 
'RepositoryMetrics', labels) for labels in label_list)) and \
+            
all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 
'RepositoryMetrics', labels) for labels in label_list[1:3]))
 
     def verify_queue_metrics(self):
         return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 
'QueueMetrics')
@@ -72,12 +73,22 @@ class PrometheusChecker:
         return self.verify_metrics_exist(['minifi_physical_mem', 
'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
 
     def verify_agent_status_metrics(self):
-        label_list = [{'repository_name': 'provenance'}, {'repository_name': 
'flowfile'}]
+        label_list = [{'repository_name': 'flowfile'}, {'repository_name': 
'content'}]
         for labels in label_list:
             if not (self.verify_metric_exists('minifi_is_running', 
'AgentStatus', labels)
                     and self.verify_metric_exists('minifi_is_full', 
'AgentStatus', labels)
-                    and self.verify_metric_exists('minifi_repository_size', 
'AgentStatus', labels)):
+                    and 
self.verify_metric_exists('minifi_max_repository_size_bytes', 'AgentStatus', 
labels)
+                    and 
self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 
'AgentStatus', labels)
+                    and 
self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', 
labels)):
                 return False
+
+        # provenance repository is NoOpRepository by default which has zero 
size
+        if not (self.verify_metric_exists('minifi_is_running', 'AgentStatus', 
{'repository_name': 'provenance'})
+                and self.verify_metric_exists('minifi_is_full', 'AgentStatus', 
{'repository_name': 'provenance'})
+                and 
self.verify_metric_exists('minifi_max_repository_size_bytes', 'AgentStatus', 
{'repository_name': 'provenance'})
+                and self.verify_metric_exists('minifi_repository_size_bytes', 
'AgentStatus', {'repository_name': 'provenance'})
+                and self.verify_metric_exists('minifi_repository_entry_count', 
'AgentStatus', {'repository_name': 'provenance'})):
+            return False
         return self.verify_metric_exists('minifi_uptime_milliseconds', 
'AgentStatus') and \
             self.verify_metric_exists('minifi_agent_memory_usage_bytes', 
'AgentStatus') and \
             self.verify_metric_exists('minifi_agent_cpu_utilization', 
'AgentStatus')
diff --git a/extensions/coap/tests/CoapIntegrationBase.h 
b/extensions/coap/tests/CoapIntegrationBase.h
index b498e1240..e7fbc1e65 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -21,6 +21,7 @@
 #include <optional>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "../tests/TestServer.h"
 #include "CivetServer.h"
@@ -72,7 +73,8 @@ class CoapIntegrationBase : public IntegrationBase {
 
     queryRootProcessGroup(pg);
 
-    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
test_repo, test_flow_repo, yaml_ptr);
+    std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{test_repo, test_flow_repo, content_repo};
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
repo_metric_sources, yaml_ptr);
     auto controller = std::make_unique<minifi::FlowController>(test_repo, 
test_flow_repo, configuration, std::move(yaml_ptr), content_repo, 
std::move(metrics_publisher_store));
 
     controller->load();
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp 
b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index 78bd1c54f..cf8e137a9 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -135,7 +135,8 @@ int main(int argc, char **argv) {
 
   auto yaml_ptr = 
std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{test_repo, 
content_repo, stream_factory, configuration, args.test_file});
 
-  auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
test_repo, test_flow_repo, yaml_ptr);
+  std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{test_repo, test_flow_repo, content_repo};
+  auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
repo_metric_sources, yaml_ptr);
   std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration,
       std::move(yaml_ptr), content_repo, std::move(metrics_publisher_store));
 
diff --git a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp 
b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
index 6b39468f5..dcb11082f 100644
--- a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
+++ b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
@@ -52,7 +52,9 @@ class PrometheusPublisherTestFixture {
     : configuration_(std::make_shared<Configure>()),
       provenance_repo_(core::createRepository("provenancerepository")),
       flow_file_repo_(core::createRepository("flowfilerepository")),
-      
response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_,
 provenance_repo_, flow_file_repo_, nullptr)) {
+      
content_repo_(core::createContentRepository("volatilecontentrepository")),
+      
response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_,
+        
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{provenance_repo_, 
flow_file_repo_, content_repo_}, nullptr)) {
     std::unique_ptr<DummyMetricsExposer> dummy_exposer;
     if (user_dummy_exposer) {
       dummy_exposer = std::make_unique<DummyMetricsExposer>();
@@ -63,8 +65,9 @@ class PrometheusPublisherTestFixture {
 
  protected:
   std::shared_ptr<Configure> configuration_;
-  std::shared_ptr<core::Repository> provenance_repo_;
-  std::shared_ptr<core::Repository> flow_file_repo_;
+  std::shared_ptr<core::RepositoryMetricsSource> provenance_repo_;
+  std::shared_ptr<core::RepositoryMetricsSource> flow_file_repo_;
+  std::shared_ptr<core::RepositoryMetricsSource> content_repo_;
   std::shared_ptr<state::response::ResponseNodeLoader> response_node_loader_;
   std::unique_ptr<PrometheusMetricsPublisher> publisher_;
   DummyMetricsExposer* exposer_ = nullptr;
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 187776778..56c0d2c83 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -267,6 +267,25 @@ void DatabaseContentRepository::clearOrphans() {
   }
 }
 
+uint64_t DatabaseContentRepository::getRepositorySize() const {
+  return (utils::optional_from_ptr(db_.get()) |
+          utils::flatMap([](const auto& db) { return db->open(); }) |
+          utils::flatMap([](const auto& opendb) { return 
opendb.getApproximateSizes(); })).value_or(0);
+}
+
+uint64_t DatabaseContentRepository::getRepositoryEntryCount() const {
+  return (utils::optional_from_ptr(db_.get()) |
+          utils::flatMap([](const auto& db) { return db->open(); }) |
+          utils::flatMap([](auto&& opendb) -> std::optional<uint64_t> {
+              std::string key_count;
+              opendb.GetProperty("rocksdb.estimate-num-keys", &key_count);
+              if (!key_count.empty()) {
+                return std::stoull(key_count);
+              }
+              return std::nullopt;
+            })).value_or(0);
+}
+
 REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, 
("DatabaseContentRepository", "databasecontentrepository"));
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h 
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 1305a7c2d..e084a6515 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -74,6 +74,9 @@ class DatabaseContentRepository : public 
core::ContentRepository {
   void start() override;
   void stop() override;
 
+  uint64_t getRepositorySize() const override;
+  uint64_t getRepositoryEntryCount() const override;
+
  protected:
   bool removeKey(const std::string& content_path) override;
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 36999c4e0..b38a7fdce 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -30,6 +30,7 @@
 #include "FlowFileRecord.h"
 #include "utils/gsl.h"
 #include "core/Resource.h"
+#include "utils/OptionalUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -110,24 +111,6 @@ void 
FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal
   }
 }
 
-void FlowFileRepository::printStats() {
-  auto opendb = db_->open();
-  if (!opendb) {
-    return;
-  }
-  std::string key_count;
-  opendb->GetProperty("rocksdb.estimate-num-keys", &key_count);
-
-  std::string table_readers;
-  opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
-
-  std::string all_memtables;
-  opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
-
-  logger_->log_info("Repository stats: key count: %s, table readers size: %s, 
all memory tables size: %s",
-      key_count, table_readers, all_memtables);
-}
-
 void FlowFileRepository::run() {
   auto last = std::chrono::steady_clock::now();
   while (isRunning()) {
@@ -142,22 +125,6 @@ void FlowFileRepository::run() {
   flush();
 }
 
-bool FlowFileRepository::ExecuteWithRetry(const 
std::function<rocksdb::Status()>& operation) {
-  constexpr int RETRY_COUNT = 3;
-  std::chrono::milliseconds wait_time = 0ms;
-  for (int i=0; i < RETRY_COUNT; ++i) {
-    auto status = operation();
-    if (status.ok()) {
-      logger_->log_trace("Rocksdb operation executed successfully");
-      return true;
-    }
-    logger_->log_error("Rocksdb operation failed: %s", status.ToString());
-    wait_time += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
-    std::this_thread::sleep_for(wait_time);
-  }
-  return false;
-}
-
 void FlowFileRepository::initialize_repository() {
   auto opendb = db_->open();
   if (!opendb) {
@@ -204,7 +171,6 @@ void FlowFileRepository::initialize_repository() {
 
 void FlowFileRepository::loadComponent(const 
std::shared_ptr<core::ContentRepository> &content_repo) {
   content_repo_ = content_repo;
-  repo_size_ = 0;
   swap_loader_ = 
std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_);
 
   initialize_repository();
@@ -276,48 +242,11 @@ void FlowFileRepository::setCompactionPeriod(const 
std::shared_ptr<Configure> &c
   }
 }
 
-bool FlowFileRepository::Put(const std::string& key, const uint8_t *buf, 
size_t bufLen) {
-  // persistent to the DB
-  auto opendb = db_->open();
-  if (!opendb) {
-    return false;
-  }
-  rocksdb::Slice value((const char *) buf, bufLen);
-  auto operation = [&key, &value, &opendb]() { return 
opendb->Put(rocksdb::WriteOptions(), key, value); };
-  return ExecuteWithRetry(operation);
-}
-
-bool FlowFileRepository::MultiPut(const std::vector<std::pair<std::string, 
std::unique_ptr<minifi::io::BufferStream>>>& data) {
-  auto opendb = db_->open();
-  if (!opendb) {
-    return false;
-  }
-  auto batch = opendb->createWriteBatch();
-  for (const auto &item : data) {
-    const auto buf = item.second->getBuffer().as_span<const char>();
-    rocksdb::Slice value(buf.data(), buf.size());
-    if (!batch.Put(item.first, value).ok()) {
-      logger_->log_error("Failed to add item to batch operation");
-      return false;
-    }
-  }
-  auto operation = [&batch, &opendb]() { return 
opendb->Write(rocksdb::WriteOptions(), &batch); };
-  return ExecuteWithRetry(operation);
-}
-
 bool FlowFileRepository::Delete(const std::string& key) {
   keys_to_delete_.enqueue({.key = key});
   return true;
 }
 
-bool FlowFileRepository::Get(const std::string &key, std::string &value) {
-  auto opendb = db_->open();
-  if (!opendb) {
-    return false;
-  }
-  return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
-}
-
 void FlowFileRepository::runCompaction() {
   do {
     if (auto opendb = db_->open()) {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h 
b/extensions/rocksdb-repos/FlowFileRepository.h
index 68959ce88..dd1412ada 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -23,13 +23,11 @@
 #include <list>
 
 #include "utils/file/FileUtils.h"
-#include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/utilities/checkpoint.h"
 #include "core/Core.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "core/ThreadedRepository.h"
 #include "Connection.h"
 #include "concurrentqueue.h"
 #include "database/RocksDatabase.h"
@@ -40,6 +38,7 @@
 #include "range/v3/algorithm/all_of.hpp"
 #include "utils/Literals.h"
 #include "utils/StoppableThread.h"
+#include "RocksDbRepository.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 
@@ -53,13 +52,12 @@ constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = 
"./flowfile_checkpoint";
 constexpr auto MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME = 
std::chrono::minutes(10);
 constexpr auto FLOWFILE_REPOSITORY_PURGE_PERIOD = std::chrono::seconds(2);
-constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = 
std::chrono::milliseconds(500);
 
 /**
  * Flow File repository
  * Design: Extends Repository and implements the run function, using rocksdb 
as the primary substrate.
  */
-class FlowFileRepository : public ThreadedRepository, public SwapManager {
+class FlowFileRepository : public RocksDbRepository, public SwapManager {
   static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = 
std::chrono::minutes{2};
 
   struct ExpiredFlowFileInfo {
@@ -79,8 +77,8 @@ class FlowFileRepository : public ThreadedRepository, public 
SwapManager {
                               std::chrono::milliseconds maxPartitionMillis = 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
                               int64_t maxPartitionBytes = 
MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
                               std::chrono::milliseconds purgePeriod = 
FLOWFILE_REPOSITORY_PURGE_PERIOD)
-    : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : 
core::getClassName<FlowFileRepository>(), std::move(directory), 
maxPartitionMillis, maxPartitionBytes, purgePeriod),
-      logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
+    : RocksDbRepository(repo_name.length() > 0 ? std::move(repo_name) : 
core::getClassName<FlowFileRepository>(),
+                        std::move(directory), maxPartitionMillis, 
maxPartitionBytes, purgePeriod, 
logging::LoggerFactory<FlowFileRepository>::getLogger()) {
   }
 
   ~FlowFileRepository() override {
@@ -95,18 +93,11 @@ class FlowFileRepository : public ThreadedRepository, 
public SwapManager {
     return false;
   }
 
-  void flush() override;
-
-  virtual void printStats();
-
-  bool initialize(const std::shared_ptr<Configure> &configure) override;
-
-  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override;
-  bool MultiPut(const std::vector<std::pair<std::string, 
std::unique_ptr<minifi::io::BufferStream>>>& data) override;
   bool Delete(const std::string& key) override;
   bool Delete(const std::shared_ptr<core::CoreComponent>& item) override;
-  bool Get(const std::string &key, std::string &value) override;
 
+  void flush() override;
+  bool initialize(const std::shared_ptr<Configure> &configure) override;
   void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo) override;
   bool start() override;
   bool stop() override;
@@ -115,27 +106,17 @@ class FlowFileRepository : public ThreadedRepository, 
public SwapManager {
 
  private:
   void run() override;
+  void initialize_repository();
 
   void runCompaction();
   void setCompactionPeriod(const std::shared_ptr<Configure> &configure);
 
-  bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
-
-  void initialize_repository();
-
   void deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& 
opendb, std::list<ExpiredFlowFileInfo>& flow_files);
 
-  std::thread& getThread() override {
-    return thread_;
-  }
-
   moodycamel::ConcurrentQueue<ExpiredFlowFileInfo> keys_to_delete_;
   std::shared_ptr<core::ContentRepository> content_repo_;
-  std::unique_ptr<minifi::internal::RocksDatabase> db_;
   std::unique_ptr<FlowFileLoader> swap_loader_;
-  std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<minifi::Configure> config_;
-  std::thread thread_;
 
   std::chrono::milliseconds compaction_period_;
   std::unique_ptr<utils::StoppableThread> compaction_thread_;
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp 
b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index 2680de9d5..15073835e 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -23,20 +23,6 @@
 
 namespace org::apache::nifi::minifi::provenance {
 
-void ProvenanceRepository::printStats() {
-  std::string key_count;
-  db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
-
-  std::string table_readers;
-  db_->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
-
-  std::string all_memtables;
-  db_->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
-
-  logger_->log_info("Repository stats: key count: %s, table readers size: %s, 
all memory tables size: %s",
-                    key_count, table_readers, all_memtables);
-}
-
 void ProvenanceRepository::run() {
   size_t count = 0;
   while (isRunning()) {
@@ -66,64 +52,45 @@ bool ProvenanceRepository::initialize(const 
std::shared_ptr<org::apache::nifi::m
   }
   logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms",
                       int64_t{max_partition_millis_.count()});
-  rocksdb::Options options;
-  options.create_if_missing = true;
-  options.use_direct_io_for_flush_and_compaction = true;
-  options.use_direct_reads = true;
-  // Rocksdb write buffers act as a log of database operation: grow till 
reaching the limit, serialized after
-  // This shouldn't go above 16MB and the configured total size of the db 
should cap it as well
-  int64_t max_buffer_size = 16 << 20;
-  options.write_buffer_size = gsl::narrow<size_t>(std::min(max_buffer_size, 
max_partition_bytes_));
-  options.max_write_buffer_number = 4;
-  options.min_write_buffer_number_to_merge = 1;
 
-  options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO;
-  options.compaction_options_fifo = 
rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false);
-  if (max_partition_millis_ > std::chrono::milliseconds(0)) {
-    options.ttl = 
std::chrono::duration_cast<std::chrono::seconds>(max_partition_millis_).count();
-  }
+  auto db_options = [] (minifi::internal::Writable<rocksdb::DBOptions>& 
db_opts) {
+    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
+    db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
+    db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
+  };
 
-  logger_->log_info("Write buffer: %llu", options.write_buffer_size);
-  logger_->log_info("Max partition bytes: %llu", max_partition_bytes_);
-  logger_->log_info("Ttl: %llu", options.ttl);
+  // Rocksdb write buffers act as a log of database operation: grow till 
reaching the limit, serialized after
+  // This shouldn't go above 16MB and the configured total size of the db 
should cap it as well
+  auto cf_options = [this] (rocksdb::ColumnFamilyOptions& cf_opts) {
+    int64_t max_buffer_size = 16 << 20;
+    cf_opts.write_buffer_size = gsl::narrow<size_t>(std::min(max_buffer_size, 
max_partition_bytes_));
+    cf_opts.max_write_buffer_number = 4;
+    cf_opts.min_write_buffer_number_to_merge = 1;
+
+    cf_opts.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO;
+    cf_opts.compaction_options_fifo = 
rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false);
+    if (max_partition_millis_ > std::chrono::milliseconds(0)) {
+      cf_opts.ttl = 
std::chrono::duration_cast<std::chrono::seconds>(max_partition_millis_).count();
+    }
+  };
 
-  rocksdb::DB* db;
-  rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
-  if (status.ok()) {
+  db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, 
directory_);
+  if (db_->open()) {
     logger_->log_debug("MiNiFi Provenance Repository database open %s 
success", directory_);
-    db_.reset(db);
   } else {
-    logger_->log_error("MiNiFi Provenance Repository database open %s failed: 
%s", directory_, status.ToString());
+    logger_->log_error("MiNiFi Provenance Repository database open %s failed", 
directory_);
     return false;
   }
 
   return true;
 }
 
-bool ProvenanceRepository::Put(const std::string& key, const uint8_t *buf, 
size_t bufLen) {
-  // persist to the DB
-  rocksdb::Slice value((const char *) buf, bufLen);
-  return db_->Put(rocksdb::WriteOptions(), key, value).ok();
-}
-
-bool ProvenanceRepository::MultiPut(const std::vector<std::pair<std::string, 
std::unique_ptr<minifi::io::BufferStream>>>& data) {
-  rocksdb::WriteBatch batch;
-  for (const auto &item : data) {
-    const auto buf = item.second->getBuffer().as_span<const char>();
-    rocksdb::Slice value(buf.data(), buf.size());
-    if (!batch.Put(item.first, value).ok()) {
-      return false;
-    }
-  }
-  return db_->Write(rocksdb::WriteOptions(), &batch).ok();
-}
-
-bool ProvenanceRepository::Get(const std::string &key, std::string &value) {
-  return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
-}
-
 bool 
ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::SerializableComponent>>
 &records, size_t &max_size) {
-  std::unique_ptr<rocksdb::Iterator> 
it(db_->NewIterator(rocksdb::ReadOptions()));
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  std::unique_ptr<rocksdb::Iterator> 
it(opendb->NewIterator(rocksdb::ReadOptions()));
   size_t requested_batch = max_size;
   max_size = 0;
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -144,13 +111,6 @@ void ProvenanceRepository::destroy() {
   db_.reset();
 }
 
-uint64_t ProvenanceRepository::getKeyCount() const {
-  std::string key_count;
-  db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
-
-  return std::stoull(key_count);
-}
-
 REGISTER_RESOURCE_AS(ProvenanceRepository, InternalResource, 
("ProvenanceRepository", "provenancerepository"));
 
 }  // namespace org::apache::nifi::minifi::provenance
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h 
b/extensions/rocksdb-repos/ProvenanceRepository.h
index 7aaeaa34e..ca0b6a8ad 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -23,14 +23,13 @@
 #include <algorithm>
 #include <utility>
 
-#include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
 #include "core/Core.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "core/ThreadedRepository.h"
 #include "provenance/Provenance.h"
 #include "utils/Literals.h"
+#include "RocksDbRepository.h"
 
 namespace org::apache::nifi::minifi::provenance {
 
@@ -39,7 +38,7 @@ constexpr auto MAX_PROVENANCE_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1);
 constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class ProvenanceRepository : public core::ThreadedRepository {
+class ProvenanceRepository : public core::repository::RocksDbRepository {
  public:
   ProvenanceRepository(std::string name, const utils::Identifier& /*uuid*/)
     : ProvenanceRepository(std::move(name)) {
@@ -50,8 +49,8 @@ class ProvenanceRepository : public core::ThreadedRepository {
                                 std::chrono::milliseconds maxPartitionMillis = 
MAX_PROVENANCE_ENTRY_LIFE_TIME,
                                 int64_t maxPartitionBytes = 
MAX_PROVENANCE_STORAGE_SIZE,
                                 std::chrono::milliseconds purgePeriod = 
PROVENANCE_PURGE_PERIOD)
-    : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : 
core::getClassName<ProvenanceRepository>(),
-        directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) {
+    : RocksDbRepository(repo_name.length() > 0 ? std::move(repo_name) : 
core::getClassName<ProvenanceRepository>(),
+        directory, maxPartitionMillis, maxPartitionBytes, purgePeriod, 
core::logging::LoggerFactory<ProvenanceRepository>::getLogger()) {
   }
 
   ~ProvenanceRepository() override {
@@ -62,27 +61,19 @@ class ProvenanceRepository : public 
core::ThreadedRepository {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
 
-  void printStats();
-
   bool isNoop() const override {
     return false;
   }
 
   bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> 
&config) override;
 
-  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override;
-  bool MultiPut(const std::vector<std::pair<std::string, 
std::unique_ptr<minifi::io::BufferStream>>>& data) override;
-
   bool Delete(const std::string& /*key*/) override {
     // The repo is cleaned up by itself, there is no need to delete items.
     return true;
   }
-
-  bool Get(const std::string &key, std::string &value) override;
   bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>> 
&records, size_t &max_size) override;
 
   void destroy();
-  uint64_t getKeyCount() const;
 
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
@@ -93,14 +84,6 @@ class ProvenanceRepository : public core::ThreadedRepository 
{
  private:
   // Run function for the thread
   void run() override;
-
-  std::thread& getThread() override {
-    return thread_;
-  }
-
-  std::unique_ptr<rocksdb::DB> db_;
-  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ProvenanceRepository>::getLogger();
-  std::thread thread_;
 };
 
 }  // namespace org::apache::nifi::minifi::provenance
diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp 
b/extensions/rocksdb-repos/RocksDbRepository.cpp
new file mode 100644
index 000000000..02a735395
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDbRepository.cpp
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RocksDbRepository.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::core::repository {
+
+void RocksDbRepository::printStats() {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return;
+  }
+  std::string key_count;
+  opendb->GetProperty("rocksdb.estimate-num-keys", &key_count);
+
+  std::string table_readers;
+  opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
+
+  std::string all_memtables;
+  opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
+
+  logger_->log_info("Repository stats: key count: %s, table readers size: %s, 
all memory tables size: %s",
+      key_count, table_readers, all_memtables);
+}
+
+bool RocksDbRepository::ExecuteWithRetry(const 
std::function<rocksdb::Status()>& operation) {
+  constexpr int RETRY_COUNT = 3;
+  std::chrono::milliseconds wait_time = 0ms;
+  for (int i=0; i < RETRY_COUNT; ++i) {
+    auto status = operation();
+    if (status.ok()) {
+      logger_->log_trace("Rocksdb operation executed successfully");
+      return true;
+    }
+    logger_->log_error("Rocksdb operation failed: %s", status.ToString());
+    wait_time += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}
+
+bool RocksDbRepository::Put(const std::string& key, const uint8_t *buf, size_t 
bufLen) {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  rocksdb::Slice value((const char *) buf, bufLen);
+  auto operation = [&key, &value, &opendb]() { return 
opendb->Put(rocksdb::WriteOptions(), key, value); };
+  return ExecuteWithRetry(operation);
+}
+
+bool RocksDbRepository::MultiPut(const std::vector<std::pair<std::string, 
std::unique_ptr<minifi::io::BufferStream>>>& data) {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  auto batch = opendb->createWriteBatch();
+  for (const auto &item : data) {
+    const auto buf = item.second->getBuffer().as_span<const char>();
+    rocksdb::Slice value(buf.data(), buf.size());
+    if (!batch.Put(item.first, value).ok()) {
+      logger_->log_error("Failed to add item to batch operation");
+      return false;
+    }
+  }
+  auto operation = [&batch, &opendb]() { return 
opendb->Write(rocksdb::WriteOptions(), &batch); };
+  return ExecuteWithRetry(operation);
+}
+
+bool RocksDbRepository::Get(const std::string &key, std::string &value) {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
+}
+
+uint64_t RocksDbRepository::getRepositorySize() const {
+  return (utils::optional_from_ptr(db_.get()) |
+          utils::flatMap([](const auto& db) { return db->open(); }) |
+          utils::flatMap([](const auto& opendb) { return 
opendb.getApproximateSizes(); })).value_or(0);
+}
+
+uint64_t RocksDbRepository::getRepositoryEntryCount() const {
+  return (utils::optional_from_ptr(db_.get()) |
+          utils::flatMap([](const auto& db) { return db->open(); }) |
+          utils::flatMap([](auto&& opendb) -> std::optional<uint64_t> {
+              std::string key_count;
+              opendb.GetProperty("rocksdb.estimate-num-keys", &key_count);
+              if (!key_count.empty()) {
+                return std::stoull(key_count);
+              }
+              return std::nullopt;
+            })).value_or(0);
+}
+
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/RocksDbRepository.h 
b/extensions/rocksdb-repos/RocksDbRepository.h
new file mode 100644
index 000000000..6fd220d8d
--- /dev/null
+++ b/extensions/rocksdb-repos/RocksDbRepository.h
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <utility>
+#include <vector>
+#include <string>
+#include <memory>
+
+#include "database/RocksDatabase.h"
+#include "core/ThreadedRepository.h"
+
+namespace org::apache::nifi::minifi::core::repository {
+
+constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = 
std::chrono::milliseconds(500);
+
+class RocksDbRepository : public ThreadedRepository {
+ public:
+  RocksDbRepository(std::string repo_name,
+                    std::string directory,
+                    std::chrono::milliseconds max_partition_millis,
+                    int64_t max_partition_bytes,
+                    std::chrono::milliseconds purge_period,
+                    std::shared_ptr<logging::Logger> logger)
+    : ThreadedRepository(std::move(repo_name), std::move(directory), 
max_partition_millis, max_partition_bytes, purge_period),
+      logger_(std::move(logger)) {
+  }
+
+  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override;
+  bool MultiPut(const std::vector<std::pair<std::string, 
std::unique_ptr<minifi::io::BufferStream>>>& data) override;
+  bool Get(const std::string &key, std::string &value) override;
+
+  uint64_t getRepositorySize() const override;
+  uint64_t getRepositoryEntryCount() const override;
+  void printStats();
+
+ protected:
+  bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::unique_ptr<minifi::internal::RocksDatabase> db_;
+  std::shared_ptr<logging::Logger> logger_;
+  std::thread thread_;
+};
+
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp 
b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
index be5f1960c..703c66a39 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
@@ -122,4 +122,17 @@ rocksdb::DB* OpenRocksDb::get() {
   return impl_.get();
 }
 
+std::optional<uint64_t> OpenRocksDb::getApproximateSizes() const {
+  const rocksdb::SizeApproximationOptions options{ .include_memtables = true };
+  std::string del_char_str(1, static_cast<char>(127));
+  std::string empty_str;
+  const rocksdb::Range range(empty_str, del_char_str);
+  uint64_t value = 0;
+  auto status = impl_->GetApproximateSizes(options, column_->handle.get(), 
&range, 1, &value);
+  if (status.ok()) {
+    return value;
+  }
+  return std::nullopt;
+}
+
 }  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h 
b/extensions/rocksdb-repos/database/OpenRocksDb.h
index ebe25aacc..b4d964381 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.h
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <string>
 #include <vector>
+#include <optional>
 #include "utils/gsl.h"
 
 #include "rocksdb/db.h"
@@ -71,6 +72,8 @@ class OpenRocksDb {
 
   rocksdb::DB* get();
 
+  std::optional<uint64_t> getApproximateSizes() const;
+
  private:
   void handleResult(const rocksdb::Status& result);
   void handleResult(const std::vector<rocksdb::Status>& results);
diff --git a/libminifi/include/core/ContentRepository.h 
b/libminifi/include/core/ContentRepository.h
index 17762af26..68230d42b 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -26,22 +26,20 @@
 #include "properties/Configure.h"
 #include "ResourceClaim.h"
 #include "StreamManager.h"
-#include "core/Core.h"
 #include "ContentSession.h"
+#include "core/RepositoryMetricsSource.h"
+#include "core/Core.h"
 
 namespace org::apache::nifi::minifi::core {
 
 /**
  * Content repository definition that extends StreamManager.
  */
-class ContentRepository : public StreamManager<minifi::ResourceClaim>, public 
utils::EnableSharedFromThis<ContentRepository>, public core::CoreComponent {
+class ContentRepository : public core::CoreComponent, public 
StreamManager<minifi::ResourceClaim>, public 
utils::EnableSharedFromThis<ContentRepository>, public 
core::RepositoryMetricsSource {
  public:
   explicit ContentRepository(std::string name, const utils::Identifier& uuid = 
{}) : core::CoreComponent(std::move(name), uuid) {}
   ~ContentRepository() override = default;
 
-  /**
-   * initialize this content repository using the provided configuration.
-   */
   virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
 
   std::string getStoragePath() const override;
@@ -59,6 +57,10 @@ class ContentRepository : public 
StreamManager<minifi::ResourceClaim>, public ut
 
   bool remove(const minifi::ResourceClaim &streamId) final;
 
+  std::string getRepositoryName() const override {
+    return getName();
+  }
+
  protected:
   void removeFromPurgeList();
   virtual bool removeKey(const std::string& content_path) = 0;
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index 77fdfbec3..0c74732bc 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -29,19 +29,20 @@
 #include <utility>
 #include <vector>
 
-#include "Core.h"
 #include "ResourceClaim.h"
 #include "core/Connectable.h"
 #include "core/ContentRepository.h"
 #include "core/Property.h"
 #include "core/SerializableComponent.h"
 #include "core/logging/LoggerFactory.h"
+#include "core/RepositoryMetricsSource.h"
 #include "properties/Configure.h"
 #include "utils/BackTrace.h"
 #include "SwapManager.h"
 #include "utils/Literals.h"
 #include "utils/StringUtils.h"
 #include "utils/TimeUtil.h"
+#include "core/Core.h"
 
 #ifndef WIN32
 #include <sys/stat.h>
@@ -54,7 +55,7 @@ constexpr auto MAX_REPOSITORY_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
 constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class Repository : public core::CoreComponent {
+class Repository : public core::CoreComponent, public 
core::RepositoryMetricsSource {
  public:
   explicit Repository(std::string repo_name = "Repository",
                       std::string directory = REPOSITORY_DIRECTORY,
@@ -62,20 +63,15 @@ class Repository : public core::CoreComponent {
                       int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
                       std::chrono::milliseconds purgePeriod = 
REPOSITORY_PURGE_PERIOD)
     : core::CoreComponent(std::move(repo_name)),
-      directory_(std::move(directory)),
       max_partition_millis_(maxPartitionMillis),
       max_partition_bytes_(maxPartitionBytes),
       purge_period_(purgePeriod),
       repo_full_(false),
-      repo_size_(0),
+      directory_(std::move(directory)),
       logger_(logging::LoggerFactory<Repository>::getLogger()) {
   }
 
-  virtual bool isRunning() const {
-    return true;
-  }
-
-  virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0;
+  virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
   virtual bool start() = 0;
   virtual bool stop() = 0;
 
@@ -116,10 +112,6 @@ class Repository : public core::CoreComponent {
     return false;
   }
 
-  virtual bool isFull() {
-    return repo_full_;
-  }
-
   virtual bool 
getElements(std::vector<std::shared_ptr<core::SerializableComponent>>& 
/*store*/, size_t& /*max_size*/) {
     return true;
   }
@@ -129,14 +121,14 @@ class Repository : public core::CoreComponent {
   virtual void loadComponent(const std::shared_ptr<core::ContentRepository>& 
/*content_repo*/) {
   }
 
-  virtual uint64_t getRepoSize() const {
-    return repo_size_;
-  }
-
   std::string getDirectory() const {
     return directory_;
   }
 
+  std::string getRepositoryName() const override {
+    return getName();
+  }
+
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   Repository(const Repository &parent) = delete;
@@ -146,15 +138,13 @@ class Repository : public core::CoreComponent {
   std::map<std::string, core::Connectable*> containers_;
 
   std::map<std::string, core::Connectable*> connection_map_;
-  std::string directory_;
   // max db entry lifetime
   std::chrono::milliseconds max_partition_millis_;
   // max db size
   int64_t max_partition_bytes_;
   std::chrono::milliseconds purge_period_;
   std::atomic<bool> repo_full_;
-
-  std::atomic<uint64_t> repo_size_;
+  std::string directory_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/RepositoryFactory.h 
b/libminifi/include/core/RepositoryFactory.h
index 60ba5775a..352dec51f 100644
--- a/libminifi/include/core/RepositoryFactory.h
+++ b/libminifi/include/core/RepositoryFactory.h
@@ -16,8 +16,7 @@
  * limitations under the License.
  */
 
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
+#pragma once
 
 #include <memory>
 #include <string>
@@ -45,5 +44,3 @@ std::unique_ptr<core::ContentRepository> 
createContentRepository(const std::stri
 std::unique_ptr<core::Repository> createRepository(const std::string& 
configuration_class_name, const std::string& repo_name = "");
 
 }  // namespace org::apache::nifi::minifi::core
-
-#endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
diff --git a/libminifi/include/core/RepositoryMetricsSource.h 
b/libminifi/include/core/RepositoryMetricsSource.h
new file mode 100644
index 000000000..cfaeb4ae6
--- /dev/null
+++ b/libminifi/include/core/RepositoryMetricsSource.h
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+namespace org::apache::nifi::minifi::core {
+
+class RepositoryMetricsSource {
+ public:
+  virtual ~RepositoryMetricsSource() = default;
+  virtual uint64_t getRepositorySize() const = 0;
+  virtual uint64_t getRepositoryEntryCount() const = 0;
+  virtual std::string getRepositoryName() const = 0;
+
+  virtual uint64_t getMaxRepositorySize() const {
+    return 0;
+  }
+
+  virtual bool isFull() const {
+    return false;
+  }
+
+  virtual bool isRunning() const {
+    return true;
+  }
+};
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/repository/FileSystemRepository.h 
b/libminifi/include/core/repository/FileSystemRepository.h
index b0c3900cd..99a26b4e3 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -21,10 +21,12 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <algorithm>
 
 #include "../ContentRepository.h"
 #include "properties/Configure.h"
 #include "core/logging/LoggerFactory.h"
+#include "utils/file/FileUtils.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 
@@ -50,6 +52,18 @@ class FileSystemRepository : public core::ContentRepository {
 
   void clearOrphans() override;
 
+  uint64_t getRepositorySize() const override {
+    return utils::file::path_size(directory_);
+  }
+
+  uint64_t getRepositoryEntryCount() const override {
+    auto dir_it = std::filesystem::recursive_directory_iterator(directory_, 
std::filesystem::directory_options::skip_permission_denied);
+    return std::count_if(
+      std::filesystem::begin(dir_it),
+      std::filesystem::end(dir_it),
+      [](auto& entry) { return entry.is_regular_file(); });
+  }
+
  protected:
   bool removeKey(const std::string& content_path) override;
 
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h 
b/libminifi/include/core/repository/VolatileContentRepository.h
index a5f6027e9..0690255f7 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -24,12 +24,12 @@
 #include "AtomicRepoEntries.h"
 #include "io/AtomicEntryStream.h"
 #include "../ContentRepository.h"
-#include "core/repository/VolatileRepository.h"
 #include "properties/Configure.h"
 #include "core/Connectable.h"
 #include "core/logging/LoggerFactory.h"
 #include "utils/GeneralUtils.h"
 #include "VolatileRepositoryData.h"
+#include "utils/Literals.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 /**
@@ -58,6 +58,22 @@ class VolatileContentRepository : public 
core::ContentRepository {
     }
   }
 
+  uint64_t getRepositorySize() const override {
+    return repo_data_.getRepositorySize();
+  }
+
+  uint64_t getMaxRepositorySize() const override {
+    return repo_data_.getMaxRepositorySize();
+  }
+
+  uint64_t getRepositoryEntryCount() const override {
+    return master_list_.size();
+  }
+
+  bool isFull() const override {
+    return repo_data_.isFull();
+  }
+
   /**
    * Initialize the volatile content repo
    * @param configure configuration
diff --git a/libminifi/include/core/repository/VolatileRepository.h 
b/libminifi/include/core/repository/VolatileRepository.h
index 6277b2a12..f22ace0ad 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -81,8 +81,20 @@ class VolatileRepository : public core::ThreadedRepository {
    */
   bool Get(const std::string& key, std::string &value) override;
 
-  uint64_t getRepoSize() const override {
-    return repo_data_.current_size;
+  uint64_t getRepositorySize() const override {
+    return repo_data_.getRepositorySize();
+  }
+
+  uint64_t getMaxRepositorySize() const override {
+    return repo_data_.getMaxRepositorySize();
+  }
+
+  uint64_t getRepositoryEntryCount() const override {
+    return repo_data_.getRepositoryEntryCount();
+  }
+
+  bool isFull() const override {
+    return repo_data_.isFull();
   }
 
  protected:
diff --git a/libminifi/include/core/repository/VolatileRepositoryData.h 
b/libminifi/include/core/repository/VolatileRepositoryData.h
index a428126c5..86d8161b7 100644
--- a/libminifi/include/core/repository/VolatileRepositoryData.h
+++ b/libminifi/include/core/repository/VolatileRepositoryData.h
@@ -35,8 +35,26 @@ struct VolatileRepositoryData {
 
   void initialize(const std::shared_ptr<Configure> &configure, const 
std::string& repo_name);
   void clear();
+
+  uint64_t getRepositorySize() const {
+    return current_size;
+  }
+
+  uint64_t getMaxRepositorySize() const {
+    return max_size;
+  }
+
+  uint64_t getRepositoryEntryCount() const {
+    return current_entry_count;
+  }
+
+  bool isFull() const {
+    return current_size >= max_size;
+  }
+
   // current size of the volatile repo.
   std::atomic<size_t> current_size;
+  std::atomic<size_t> current_entry_count;
   // value vector that exists for non blocking iteration over
   // objects that store data for this repo instance.
   std::vector<AtomicEntry<std::string>*> value_vector;
diff --git a/libminifi/include/core/state/MetricsPublisherStore.h 
b/libminifi/include/core/state/MetricsPublisherStore.h
index 90eab8aa1..b4fafaa92 100644
--- a/libminifi/include/core/state/MetricsPublisherStore.h
+++ b/libminifi/include/core/state/MetricsPublisherStore.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <utility>
 #include <memory>
+#include <vector>
 
 #include "MetricsPublisher.h"
 #include "core/state/nodes/ResponseNodeLoader.h"
@@ -31,8 +32,8 @@ namespace org::apache::nifi::minifi::state {
 
 class MetricsPublisherStore {
  public:
-  MetricsPublisherStore(std::shared_ptr<Configure> configuration, 
std::shared_ptr<core::Repository> provenance_repo,
-    std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::FlowConfiguration> flow_configuration);
+  MetricsPublisherStore(std::shared_ptr<Configure> configuration, const 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& 
repository_metric_sources,
+    std::shared_ptr<core::FlowConfiguration> flow_configuration);
   void initialize(core::controller::ControllerServiceProvider* controller, 
state::StateMonitor* update_sink);
   void loadMetricNodes(core::ProcessGroup* root);
   void clearMetricNodes();
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h 
b/libminifi/include/core/state/nodes/AgentInformation.h
index d38465295..0d18c721e 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -61,6 +61,7 @@
 #include "core/AgentIdentificationProvider.h"
 #include "utils/Export.h"
 #include "SupportedOperations.h"
+#include "core/RepositoryMetricsSource.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -423,13 +424,13 @@ class AgentStatus : public StateMonitorNode {
     return "AgentStatus";
   }
 
-  void setRepositories(const std::map<std::string, 
std::shared_ptr<core::Repository>> &repositories) {
+  void setRepositories(const std::map<std::string, 
std::shared_ptr<core::RepositoryMetricsSource>> &repositories) {
     repositories_ = repositories;
   }
 
-  void addRepository(const std::shared_ptr<core::Repository> &repo) {
+  void addRepository(const std::shared_ptr<core::RepositoryMetricsSource> 
&repo) {
     if (nullptr != repo) {
-      repositories_.insert(std::make_pair(repo->getName(), repo));
+      repositories_.insert(std::make_pair(repo->getRepositoryName(), repo));
     }
   }
 
@@ -454,9 +455,11 @@ class AgentStatus : public StateMonitorNode {
   std::vector<PublishedMetric> calculateMetrics() override {
     std::vector<PublishedMetric> metrics;
     for (const auto& [_, repo] : repositories_) {
-      metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getName()}}});
-      metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getName()}}});
-      metrics.push_back({"repository_size", 
static_cast<double>(repo->getRepoSize()), {{"metric_class", getName()}, 
{"repository_name", repo->getName()}}});
+      metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}});
+      metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}});
+      metrics.push_back({"repository_size_bytes", 
static_cast<double>(repo->getRepositorySize()), {{"metric_class", getName()}, 
{"repository_name", repo->getRepositoryName()}}});
+      metrics.push_back({"max_repository_size_bytes", 
static_cast<double>(repo->getMaxRepositorySize()), {{"metric_class", 
getName()}, {"repository_name", repo->getRepositoryName()}}});
+      metrics.push_back({"repository_entry_count", 
static_cast<double>(repo->getRepositoryEntryCount()), {{"metric_class", 
getName()}, {"repository_name", repo->getRepositoryName()}}});
     }
     if (nullptr != monitor_) {
       auto uptime = monitor_->getUptime();
@@ -488,26 +491,36 @@ class AgentStatus : public StateMonitorNode {
     repositories.name = "repositories";
 
     for (const auto& repo : repositories_) {
-      SerializedResponseNode repoNode;
-      repoNode.collapsible = false;
-      repoNode.name = repo.first;
-
-      SerializedResponseNode queuesize;
-      queuesize.name = "size";
-      queuesize.value = repo.second->getRepoSize();
-
-      SerializedResponseNode isRunning;
-      isRunning.name = "running";
-      isRunning.value = repo.second->isRunning();
-
-      SerializedResponseNode isFull;
-      isFull.name = "full";
-      isFull.value = repo.second->isFull();
-
-      repoNode.children.push_back(queuesize);
-      repoNode.children.push_back(isRunning);
-      repoNode.children.push_back(isFull);
-      repositories.children.push_back(repoNode);
+      SerializedResponseNode repo_node;
+      repo_node.collapsible = false;
+      repo_node.name = repo.first;
+
+      SerializedResponseNode repo_size;
+      repo_size.name = "size";
+      repo_size.value = repo.second->getRepositorySize();
+
+      SerializedResponseNode max_repo_size;
+      max_repo_size.name = "maxSize";
+      max_repo_size.value = repo.second->getMaxRepositorySize();
+
+      SerializedResponseNode repo_entry_count;
+      repo_entry_count.name = "entryCount";
+      repo_entry_count.value = repo.second->getRepositoryEntryCount();
+
+      SerializedResponseNode is_running;
+      is_running.name = "running";
+      is_running.value = repo.second->isRunning();
+
+      SerializedResponseNode is_full;
+      is_full.name = "full";
+      is_full.value = repo.second->isFull();
+
+      repo_node.children.push_back(repo_size);
+      repo_node.children.push_back(max_repo_size);
+      repo_node.children.push_back(repo_entry_count);
+      repo_node.children.push_back(is_running);
+      repo_node.children.push_back(is_full);
+      repositories.children.push_back(repo_node);
     }
     return repositories;
   }
@@ -578,7 +591,7 @@ class AgentStatus : public StateMonitorNode {
     return resource_consumption;
   }
 
-  std::map<std::string, std::shared_ptr<core::Repository>> repositories_;
+  std::map<std::string, std::shared_ptr<core::RepositoryMetricsSource>> 
repositories_;
 
   MINIFIAPI static utils::ProcessCpuUsageTracker cpu_load_tracker_;
   MINIFIAPI static std::mutex cpu_load_tracker_mutex_;
@@ -608,9 +621,9 @@ class AgentMonitor {
   AgentMonitor()
       : monitor_(nullptr) {
   }
-  void addRepository(const std::shared_ptr<core::Repository> &repo) {
+  void addRepository(const std::shared_ptr<core::RepositoryMetricsSource> 
&repo) {
     if (nullptr != repo) {
-      repositories_.insert(std::make_pair(repo->getName(), repo));
+      repositories_.insert(std::make_pair(repo->getRepositoryName(), repo));
     }
   }
 
@@ -619,7 +632,7 @@ class AgentMonitor {
   }
 
  protected:
-  std::map<std::string, std::shared_ptr<core::Repository>> repositories_;
+  std::map<std::string, std::shared_ptr<core::RepositoryMetricsSource>> 
repositories_;
   state::StateMonitor* monitor_ = nullptr;
 };
 
diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h 
b/libminifi/include/core/state/nodes/RepositoryMetrics.h
index 599730bef..bc8dada72 100644
--- a/libminifi/include/core/state/nodes/RepositoryMetrics.h
+++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h
@@ -26,6 +26,7 @@
 
 #include "../nodes/MetricsBase.h"
 #include "Connection.h"
+#include "core/RepositoryMetricsSource.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -54,33 +55,42 @@ class RepositoryMetrics : public ResponseNode {
     return "RepositoryMetrics";
   }
 
-  void addRepository(const std::shared_ptr<core::Repository> &repo) {
+  void addRepository(const std::shared_ptr<core::RepositoryMetricsSource> 
&repo) {
     if (nullptr != repo) {
-      repositories_.insert(std::make_pair(repo->getName(), repo));
+      repositories_.push_back(repo);
     }
   }
 
   std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized;
-    for (auto conn : repositories_) {
-      auto repo = conn.second;
+    for (const auto& repo : repositories_) {
       SerializedResponseNode parent;
-      parent.name = repo->getName();
-      SerializedResponseNode datasize;
-      datasize.name = "running";
-      datasize.value = repo->isRunning();
+      parent.name = repo->getRepositoryName();
+      SerializedResponseNode is_running;
+      is_running.name = "running";
+      is_running.value = repo->isRunning();
 
-      SerializedResponseNode datasizemax;
-      datasizemax.name = "full";
-      datasizemax.value = repo->isFull();
+      SerializedResponseNode is_full;
+      is_full.name = "full";
+      is_full.value = repo->isFull();
 
-      SerializedResponseNode queuesize;
-      queuesize.name = "size";
-      queuesize.value = std::to_string(repo->getRepoSize());
+      SerializedResponseNode repo_size;
+      repo_size.name = "size";
+      repo_size.value = std::to_string(repo->getRepositorySize());
 
-      parent.children.push_back(datasize);
-      parent.children.push_back(datasizemax);
-      parent.children.push_back(queuesize);
+      SerializedResponseNode max_repo_size;
+      max_repo_size.name = "maxSize";
+      max_repo_size.value = std::to_string(repo->getMaxRepositorySize());
+
+      SerializedResponseNode repo_entry_count;
+      repo_entry_count.name = "entryCount";
+      repo_entry_count.value = repo->getRepositoryEntryCount();
+
+      parent.children.push_back(is_running);
+      parent.children.push_back(is_full);
+      parent.children.push_back(repo_size);
+      parent.children.push_back(max_repo_size);
+      parent.children.push_back(repo_entry_count);
 
       serialized.push_back(parent);
     }
@@ -89,16 +99,18 @@ class RepositoryMetrics : public ResponseNode {
 
   std::vector<PublishedMetric> calculateMetrics() override {
     std::vector<PublishedMetric> metrics;
-    for (const auto& [_, repo] : repositories_) {
-      metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getName()}}});
-      metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getName()}}});
-      metrics.push_back({"repository_size", 
static_cast<double>(repo->getRepoSize()), {{"metric_class", getName()}, 
{"repository_name", repo->getName()}}});
+    for (const auto& repo : repositories_) {
+      metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}});
+      metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}});
+      metrics.push_back({"repository_size_bytes", 
static_cast<double>(repo->getRepositorySize()), {{"metric_class", getName()}, 
{"repository_name", repo->getRepositoryName()}}});
+      metrics.push_back({"max_repository_size_bytes", 
static_cast<double>(repo->getMaxRepositorySize()), {{"metric_class", 
getName()}, {"repository_name", repo->getRepositoryName()}}});
+      metrics.push_back({"repository_entry_count", 
static_cast<double>(repo->getRepositoryEntryCount()), {{"metric_class", 
getName()}, {"repository_name", repo->getRepositoryName()}}});
     }
     return metrics;
   }
 
  protected:
-  std::map<std::string, std::shared_ptr<core::Repository>> repositories_;
+  std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repositories_;
 };
 
 }  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h 
b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 719d2e9a5..af330585c 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -33,13 +33,14 @@
 #include "utils/gsl.h"
 #include "utils/Id.h"
 #include "utils/expected.h"
+#include "core/RepositoryMetricsSource.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
 class ResponseNodeLoader {
  public:
-  ResponseNodeLoader(std::shared_ptr<Configure> configuration, 
std::shared_ptr<core::Repository> provenance_repo,
-    std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::FlowConfiguration> flow_configuration);
+  ResponseNodeLoader(std::shared_ptr<Configure> configuration, 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources,
+    std::shared_ptr<core::FlowConfiguration> flow_configuration);
 
   void setNewConfigRoot(core::ProcessGroup* root);
   void clearConfigRoot();
@@ -70,8 +71,7 @@ class ResponseNodeLoader {
   std::unordered_map<std::string, std::vector<SharedResponseNode>> 
component_metrics_;
   std::unordered_map<std::string, SharedResponseNode> system_metrics_;
   std::shared_ptr<Configure> configuration_;
-  std::shared_ptr<core::Repository> provenance_repo_;
-  std::shared_ptr<core::Repository> flow_file_repo_;
+  std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources_;
   std::shared_ptr<core::FlowConfiguration> flow_configuration_;
   core::controller::ControllerServiceProvider* controller_{};
   state::StateMonitor* update_sink_{};
diff --git a/libminifi/include/utils/file/FileUtils.h 
b/libminifi/include/utils/file/FileUtils.h
index db37d861b..ba778f583 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -206,6 +206,20 @@ inline bool is_directory(const std::filesystem::path 
&path) {
   return false;
 }
 
+inline uint64_t path_size(const std::filesystem::path& path) {
+  uint64_t size = 0;
+  if (std::filesystem::is_regular_file(path)) {
+    return utils::file::file_size(path);
+  } else if (utils::file::is_directory(path)) {
+    for (const std::filesystem::directory_entry& entry : 
std::filesystem::recursive_directory_iterator(path, 
std::filesystem::directory_options::skip_permission_denied)) {
+      if (entry.is_regular_file()) {
+        size += entry.file_size();
+      }
+    }
+  }
+  return size;
+}
+
 inline bool exists(const std::filesystem::path &path) {
   std::error_code ec;
   bool result = std::filesystem::exists(path, ec);
diff --git a/libminifi/src/core/RepositoryFactory.cpp 
b/libminifi/src/core/RepositoryFactory.cpp
index 8455043cb..bc79e136d 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -37,6 +37,7 @@ std::unique_ptr<core::ContentRepository> 
createContentRepository(const std::stri
     auto return_obj = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::ContentRepository>(class_name_lc,
                                                                                
                       class_name_lc);
     if (return_obj) {
+      return_obj->setName(repo_name);
       return return_obj;
     }
     if (class_name_lc == "volatilecontentrepository") {
@@ -68,6 +69,14 @@ class NoOpThreadedRepository : public 
core::ThreadedRepository {
     stop();
   }
 
+  uint64_t getRepositorySize() const override {
+    return 0;
+  }
+
+  uint64_t getRepositoryEntryCount() const override {
+    return 0;
+  }
+
  private:
   void run() override {
   }
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp 
b/libminifi/src/core/repository/VolatileRepository.cpp
index 5171376cd..161f9f357 100644
--- a/libminifi/src/core/repository/VolatileRepository.cpp
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -71,6 +71,9 @@ bool VolatileRepository::Put(const std::string& key, const 
uint8_t *buf, size_t
     }
   } while (!updated);
   repo_data_.current_size += size;
+  if (repo_data_.current_entry_count < repo_data_.max_count) {
+    ++repo_data_.current_entry_count;
+  }
 
   logger_->log_debug("VolatileRepository -- put %zu %" PRIu32, 
repo_data_.current_size.load(), current_index_.load());
   return true;
@@ -92,6 +95,7 @@ bool VolatileRepository::Delete(const std::string& key) {
     RepoValue<std::string> value;
     if (ent->getValue(key, value)) {
       repo_data_.current_size -= value.size();
+      --repo_data_.current_entry_count;
       logger_->log_debug("Delete and pushed into purge_list from volatile");
       emplace(value);
       return true;
@@ -102,10 +106,8 @@ bool VolatileRepository::Delete(const std::string& key) {
 
 bool VolatileRepository::Get(const std::string &key, std::string &value) {
   for (auto ent : repo_data_.value_vector) {
-    // let the destructor do the cleanup
     RepoValue<std::string> repo_value;
     if (ent->getValue(key, repo_value)) {
-      repo_data_.current_size -= value.size();
       repo_value.emplace(value);
       return true;
     }
diff --git a/libminifi/src/core/repository/VolatileRepositoryData.cpp 
b/libminifi/src/core/repository/VolatileRepositoryData.cpp
index a5ab41aff..09633ff40 100644
--- a/libminifi/src/core/repository/VolatileRepositoryData.cpp
+++ b/libminifi/src/core/repository/VolatileRepositoryData.cpp
@@ -23,6 +23,7 @@ namespace org::apache::nifi::minifi::core::repository {
 
 VolatileRepositoryData::VolatileRepositoryData(uint32_t max_count, size_t 
max_size)
   : current_size(0),
+    current_entry_count(0),
     max_count(max_count),
     max_size(max_size) {
 }
diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp 
b/libminifi/src/core/state/MetricsPublisherStore.cpp
index d4622b8ea..c837d264a 100644
--- a/libminifi/src/core/state/MetricsPublisherStore.cpp
+++ b/libminifi/src/core/state/MetricsPublisherStore.cpp
@@ -22,10 +22,10 @@
 
 namespace org::apache::nifi::minifi::state {
 
-MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> 
configuration, std::shared_ptr<core::Repository> provenance_repo,
-  std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::FlowConfiguration> flow_configuration)
+MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> 
configuration, const 
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& 
repository_metric_sources,
+  std::shared_ptr<core::FlowConfiguration> flow_configuration)
     : configuration_(configuration),
-      
response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration),
 std::move(provenance_repo), std::move(flow_file_repo), 
std::move(flow_configuration))) {
+      
response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration),
 repository_metric_sources, std::move(flow_configuration))) {
 }
 
 void 
MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* 
controller, state::StateMonitor* update_sink) {
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp 
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index aeed88e99..66fe7f213 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -32,12 +32,11 @@
 
 namespace org::apache::nifi::minifi::state::response {
 
-ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> 
configuration, std::shared_ptr<core::Repository> provenance_repo,
-    std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::FlowConfiguration> flow_configuration)
-  : configuration_(std::move(configuration)),
-    provenance_repo_(std::move(provenance_repo)),
-    flow_file_repo_(std::move(flow_file_repo)),
-    flow_configuration_(std::move(flow_configuration)) {
+ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> 
configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources,
+  std::shared_ptr<core::FlowConfiguration> flow_configuration)
+    : configuration_(std::move(configuration)),
+      repository_metric_sources_(std::move(repository_metric_sources)),
+      flow_configuration_(std::move(flow_configuration)) {
 }
 
 void ResponseNodeLoader::clearConfigRoot() {
@@ -124,8 +123,9 @@ std::vector<SharedResponseNode> 
ResponseNodeLoader::getResponseNodes(const std::
 void ResponseNodeLoader::initializeRepositoryMetrics(const SharedResponseNode& 
response_node) const {
   auto repository_metrics = 
dynamic_cast<RepositoryMetrics*>(response_node.get());
   if (repository_metrics != nullptr) {
-    repository_metrics->addRepository(provenance_repo_);
-    repository_metrics->addRepository(flow_file_repo_);
+    for (const auto& repo : repository_metric_sources_) {
+      repository_metrics->addRepository(repo);
+    }
   }
 }
 
@@ -155,8 +155,9 @@ void ResponseNodeLoader::initializeAgentIdentifier(const 
SharedResponseNode& res
 void ResponseNodeLoader::initializeAgentMonitor(const SharedResponseNode& 
response_node) const {
   auto monitor = 
dynamic_cast<state::response::AgentMonitor*>(response_node.get());
   if (monitor != nullptr) {
-    monitor->addRepository(provenance_repo_);
-    monitor->addRepository(flow_file_repo_);
+    for (const auto& repo : repository_metric_sources_) {
+      monitor->addRepository(repo);
+    }
     monitor->setStateMonitor(update_sink_);
   }
 }
@@ -176,8 +177,9 @@ void ResponseNodeLoader::initializeAgentNode(const 
SharedResponseNode& response_
 void ResponseNodeLoader::initializeAgentStatus(const SharedResponseNode& 
response_node) const {
   auto agent_status = 
dynamic_cast<state::response::AgentStatus*>(response_node.get());
   if (agent_status != nullptr) {
-    agent_status->addRepository(provenance_repo_);
-    agent_status->addRepository(flow_file_repo_);
+    for (const auto& repo : repository_metric_sources_) {
+      agent_status->addRepository(repo);
+    }
     agent_status->setStateMonitor(update_sink_);
   }
 }
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h 
b/libminifi/test/flow-tests/TestControllerWithFlow.h
index ced53fea9..760d903e3 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <memory>
 #include <utility>
+#include <vector>
 
 #include "FlowController.h"
 #include "unit/ProvenanceTestHelper.h"
@@ -68,7 +69,8 @@ class TestControllerWithFlow: public TestController {
     auto flow = 
std::make_shared<core::YamlConfiguration>(core::ConfigurationContext{ff_repo, 
content_repo, stream_factory, configuration_, yaml_path_.string()});
     auto root = flow->getRoot();
     root_ = root.get();
-    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration_, 
prov_repo, ff_repo, flow);
+    std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{prov_repo, ff_repo, content_repo};
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration_, 
repo_metric_sources, flow);
     metrics_publisher_store_ = metrics_publisher_store.get();
     controller_ = std::make_shared<minifi::FlowController>(prov_repo, ff_repo, 
configuration_, std::move(flow), content_repo, 
std::move(metrics_publisher_store));
     controller_->load(std::move(root));
diff --git a/libminifi/test/integration/IntegrationBase.h 
b/libminifi/test/integration/IntegrationBase.h
index 84bfedc7e..00631ffdf 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -24,6 +24,8 @@
 #include <optional>
 #include <string>
 #include <utility>
+#include <vector>
+
 #include "core/logging/Logger.h"
 #include "core/ProcessGroup.h"
 #include "core/yaml/YamlConfiguration.h"
@@ -195,7 +197,8 @@ void IntegrationBase::run(const 
std::optional<std::filesystem::path>& test_file_
       running = true;
     };
 
-    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
test_repo, test_flow_repo, flow_config);
+    std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{test_repo, test_flow_repo, content_repo};
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configuration, 
repo_metric_sources, flow_config);
     flowController_ = std::make_unique<minifi::FlowController>(test_repo, 
test_flow_repo, configuration,
       std::move(flow_config), content_repo, 
std::move(metrics_publisher_store), filesystem, request_restart);
     flowController_->load();
diff --git a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp 
b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
index db437fb61..ef061d290 100644
--- a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
@@ -52,7 +52,7 @@ void verifyMaxKeyCount(const 
minifi::provenance::ProvenanceRepository& repo, uin
 
   for (int i = 0; i < 50; ++i) {
     std::this_thread::sleep_for(100ms);
-    k = std::min(k, repo.getKeyCount());
+    k = std::min(k, repo.getRepositoryEntryCount());
     if (k < keyCount) {
       break;
     }
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp 
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 5d921fff0..4a187cca6 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -27,6 +27,7 @@
 #include "core/RepositoryFactory.h"
 #include "FlowFileRecord.h"
 #include "FlowFileRepository.h"
+#include "ProvenanceRepository.h"
 #include "provenance/Provenance.h"
 #include "properties/Configure.h"
 #include "../unit/ProvenanceTestHelper.h"
@@ -34,6 +35,9 @@
 #include "../Catch.h"
 #include "utils/gsl.h"
 #include "utils/IntegrationTestUtils.h"
+#include "core/repository/VolatileFlowFileRepository.h"
+#include "core/repository/VolatileProvenanceRepository.h"
+#include "DatabaseContentRepository.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -515,4 +519,221 @@ TEST_CASE("FlowFileRepository synchronously pushes 
existing flow files") {
   }
 }
 
+TEST_CASE("Test getting flow file repository size properties", 
"[TestGettingRepositorySize]") {
+  
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+  
LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::VolatileFlowFileRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::VolatileProvenanceRepository>();
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+
+  std::shared_ptr<core::Repository> repository;
+  auto expected_is_full = false;
+  uint64_t expected_max_repo_size = 0;
+  SECTION("FlowFileRepository") {
+    repository = std::make_shared<core::repository::FlowFileRepository>("ff", 
dir.string(), 0ms, 0, 1ms);
+  }
+
+  SECTION("ProvenanceRepository") {
+    repository = 
std::make_shared<minifi::provenance::ProvenanceRepository>("ff", dir.string(), 
0ms, 0, 1ms);
+  }
+
+  SECTION("VolatileFlowFileRepository") {
+    repository = 
std::make_shared<core::repository::VolatileFlowFileRepository>("ff", 
dir.string(), 0ms, 10, 1ms);
+    expected_is_full = true;
+    expected_max_repo_size = 7;
+  }
+
+  SECTION("VolatileProvenanceRepository") {
+    repository = 
std::make_shared<core::repository::VolatileProvenanceRepository>("ff", 
dir.string(), 0ms, 10, 1ms);
+    expected_is_full = true;
+    expected_max_repo_size = 7;
+  }
+  auto configuration = std::make_shared<minifi::Configure>();
+  repository->initialize(configuration);
+
+  auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+
+  for (auto i = 0; i < 100; ++i) {
+    flow_file->addAttribute("key" + std::to_string(i), "testattributevalue" + 
std::to_string(i));
+  }
+
+  auto original_size = repository->getRepositorySize();
+  using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+  REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(5), 
[&original_size, &repository] {
+      auto old_size = original_size;
+      original_size = repository->getRepositorySize();
+      return old_size == original_size;
+    },
+    std::chrono::milliseconds(50)));
+  REQUIRE(true == flow_file->Persist(repository));
+  auto flow_file_2 = std::make_shared<minifi::FlowFileRecord>();
+  REQUIRE(true == flow_file_2->Persist(repository));
+
+  repository->flush();
+  repository->stop();
+
+  auto new_size = repository->getRepositorySize();
+  REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(5), [&new_size, 
&repository] {
+      auto old_size = new_size;
+      new_size = repository->getRepositorySize();
+      return old_size == new_size;
+    },
+    std::chrono::milliseconds(50)));
+  REQUIRE(new_size > original_size);
+  REQUIRE(expected_is_full == repository->isFull());
+  REQUIRE(expected_max_repo_size == repository->getMaxRepositorySize());
+  REQUIRE(2 == repository->getRepositoryEntryCount());
+}
+
+TEST_CASE("Test getting noop repository size properties", 
"[TestGettingRepositorySize]") {
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+
+  auto repository = minifi::core::createRepository("NoOpRepository", "ff");
+
+  repository->initialize(std::make_shared<minifi::Configure>());
+
+  auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+
+  flow_file->addAttribute("key", "testattributevalue");
+
+  repository->flush();
+  repository->stop();
+
+  REQUIRE(repository->getRepositorySize() == 0);
+  REQUIRE(!repository->isFull());
+  REQUIRE(repository->getMaxRepositorySize() == 0);
+  REQUIRE(repository->getRepositoryEntryCount() == 0);
+}
+
+TEST_CASE("Test getting content repository size properties", 
"[TestGettingRepositorySize]") {
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>();
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+
+  auto repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
+
+  auto content_repo_dir = testController.createTempDirectory();
+  auto configuration = std::make_shared<minifi::Configure>();
+  
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
 content_repo_dir.string());
+  std::string content = "content";
+  
configuration->set(minifi::Configure::nifi_volatile_repository_options_content_max_bytes,
 std::to_string(content.size()));
+
+  std::shared_ptr<core::ContentRepository> content_repo;
+  auto expected_is_full = false;
+  uint64_t expected_max_repo_size = 0;
+  SECTION("FileSystemRepository") {
+    content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  }
+
+  SECTION("VolatileContentRepository") {
+    content_repo = 
std::make_shared<core::repository::VolatileContentRepository>("content");
+    expected_is_full = true;
+    expected_max_repo_size = content.size();
+  }
+
+  SECTION("DatabaseContentRepository") {
+    content_repo = 
std::make_shared<core::repository::DatabaseContentRepository>();
+  }
+
+  content_repo->initialize(configuration);
+
+  repository->initialize(configuration);
+  repository->loadComponent(content_repo);
+  auto original_content_repo_size = content_repo->getRepositorySize();
+
+  auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+
+  auto content_session = content_repo->createSession();
+  auto claim = content_session->create();
+  auto stream = content_session->write(claim);
+  stream->write(gsl::make_span(content).as_span<const std::byte>());
+  flow_file->setResourceClaim(claim);
+  flow_file->setSize(stream->size());
+  flow_file->setOffset(0);
+
+  stream->close();
+  content_session->commit();
+
+  repository->flush();
+  repository->stop();
+
+  using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+  REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(5), 
[&original_content_repo_size, &content_repo] {
+      auto new_content_repo_size = content_repo->getRepositorySize();
+      return new_content_repo_size > original_content_repo_size;
+    },
+    std::chrono::milliseconds(50)));
+
+  REQUIRE(expected_is_full == content_repo->isFull());
+  REQUIRE(expected_max_repo_size == content_repo->getMaxRepositorySize());
+  REQUIRE(1 == content_repo->getRepositoryEntryCount());
+}
+
+TEST_CASE("Flow file repositories can be stopped", "[TestRepoIsRunning]") {
+  
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+  
LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::VolatileFlowFileRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::VolatileProvenanceRepository>();
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+
+  std::shared_ptr<core::Repository> repository;
+  SECTION("FlowFileRepository") {
+    repository = std::make_shared<core::repository::FlowFileRepository>("ff", 
dir.string(), 0ms, 0, 1ms);
+  }
+
+  SECTION("ProvenanceRepository") {
+    repository = 
std::make_shared<minifi::provenance::ProvenanceRepository>("ff", dir.string(), 
0ms, 0, 1ms);
+  }
+
+  SECTION("VolatileFlowFileRepository") {
+    repository = 
std::make_shared<core::repository::VolatileFlowFileRepository>("ff", 
dir.string(), 0ms, 10, 1ms);
+  }
+
+  SECTION("VolatileProvenanceRepository") {
+    repository = 
std::make_shared<core::repository::VolatileProvenanceRepository>("ff", 
dir.string(), 0ms, 10, 1ms);
+  }
+
+  SECTION("NoOpRepository") {
+    repository = minifi::core::createRepository("NoOpRepository", "ff");
+  }
+
+  repository->initialize(std::make_shared<minifi::Configure>());
+
+  REQUIRE(!repository->isRunning());
+  repository->start();
+  REQUIRE(repository->isRunning());
+  repository->stop();
+  REQUIRE(!repository->isRunning());
+}
+
+TEST_CASE("Content repositories are always running", "[TestRepoIsRunning]") {
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>();
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+
+  std::shared_ptr<core::ContentRepository> content_repo;
+  SECTION("FileSystemRepository") {
+    content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  }
+
+  SECTION("VolatileContentRepository") {
+    content_repo = 
std::make_shared<core::repository::VolatileContentRepository>("content");
+  }
+
+  SECTION("DatabaseContentRepository") {
+    content_repo = 
std::make_shared<core::repository::DatabaseContentRepository>();
+  }
+
+  REQUIRE(content_repo->isRunning());
+}
+
 }  // namespace
diff --git a/libminifi/test/unit/ControllerSocketMetricsPublisherTest.cpp 
b/libminifi/test/unit/ControllerSocketMetricsPublisherTest.cpp
index 71a336ac2..8db4e2f00 100644
--- a/libminifi/test/unit/ControllerSocketMetricsPublisherTest.cpp
+++ b/libminifi/test/unit/ControllerSocketMetricsPublisherTest.cpp
@@ -63,7 +63,7 @@ class ControllerSocketMetricsPublisherTestFixture {
  public:
   ControllerSocketMetricsPublisherTestFixture()
       : configuration_(std::make_shared<Configure>()),
-        
response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_,
 nullptr, nullptr, nullptr)),
+        
response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_,
 std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr)),
         test_response_node_(std::make_shared<TestQueueMetrics>()),
         controller_socket_metrics_publisher_("test_publisher") {
     controller_socket_metrics_publisher_.initialize(configuration_, 
response_node_loader_);
diff --git a/libminifi/test/unit/FileUtilsTests.cpp 
b/libminifi/test/unit/FileUtilsTests.cpp
index fc87866b7..d537f6f27 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -477,3 +477,42 @@ TEST_CASE("FileUtils::get_relative_path", 
"[TestGetRelativePath]") {
   REQUIRE(*FileUtils::get_relative_path(path, base_path / "") == 
std::filesystem::path("subdir") / "file.log");
   REQUIRE(*FileUtils::get_relative_path(base_path, base_path) == ".");
 }
+
+TEST_CASE("FileUtils::path_size", "[TestPathSize]") {
+  auto writeToFile = [](const std::filesystem::path& path) {
+    std::ofstream test_file_stream(path, std::ios::out | std::ios::binary);
+    test_file_stream << "foo\n";
+    test_file_stream.flush();
+  };
+
+  TestController test_controller;
+  REQUIRE(FileUtils::path_size({""}) == 0);
+  REQUIRE(FileUtils::path_size({"/random/non-existent/dir"}) == 0);
+
+  auto dir = test_controller.createTempDirectory();
+  REQUIRE(FileUtils::path_size(dir) == 0);
+
+  auto test_file = dir / "test_file.log";
+  writeToFile(test_file);
+
+  REQUIRE(FileUtils::path_size(test_file) == 4);
+  REQUIRE(FileUtils::path_size(dir) == 4);
+
+  auto subdir = dir / "subdir";
+  REQUIRE(utils::file::create_dir(subdir) == 0);
+
+  REQUIRE(FileUtils::path_size(dir) == 4);
+
+  auto subdir_test_file = subdir / "test_file2.log";
+  writeToFile(subdir_test_file);
+
+  REQUIRE(FileUtils::path_size(dir) == 8);
+
+  auto subsubdir = subdir / "subsubdir";
+  REQUIRE(utils::file::create_dir(subsubdir) == 0);
+
+  auto subsubdir_test_file = subsubdir / "test_file3.log";
+  writeToFile(subsubdir_test_file);
+
+  REQUIRE(FileUtils::path_size(dir) == 12);
+}
diff --git a/libminifi/test/unit/MetricsTests.cpp 
b/libminifi/test/unit/MetricsTests.cpp
index 972365fd4..af6d9e963 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -26,16 +26,22 @@
 #include "repository/VolatileContentRepository.h"
 #include "ProvenanceTestHelper.h"
 #include "../DummyProcessor.h"
+#include "range/v3/algorithm/find_if.hpp"
 
 using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
 
+void checkSerializedValue(const 
std::vector<org::apache::nifi::minifi::state::response::SerializedResponseNode>&
 children, const std::string& name, const std::string& expected_value) {
+  auto it = ranges::find_if(children, [&](const auto& child) { return 
child.name == name; });
+  REQUIRE(it != children.end());
+  REQUIRE(expected_value == it->value.to_string());
+}
+
 TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") {
   minifi::state::response::QueueMetrics metrics;
 
   REQUIRE("QueueMetrics" == metrics.getName());
-
   REQUIRE(metrics.serialize().empty());
 }
 
@@ -58,40 +64,24 @@ TEST_CASE("QueueMetricsTestConnections", "[c2m3]") {
 
   metrics.updateConnection(connection.get());
 
-  REQUIRE(1 == metrics.serialize().size());
+  auto seialized_metrics = metrics.serialize();
+  REQUIRE(1 == seialized_metrics.size());
 
   minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
   REQUIRE("testconnection" == resp.name);
-
   REQUIRE(4 == resp.children.size());
 
-  minifi::state::response::SerializedResponseNode datasize = 
resp.children.at(0);
-
-  REQUIRE("datasize" == datasize.name);
-  REQUIRE("0" == datasize.value.to_string());
-
-  minifi::state::response::SerializedResponseNode datasizemax = 
resp.children.at(1);
-
-  REQUIRE("datasizemax" == datasizemax.name);
-  REQUIRE("1024" == datasizemax.value);
-
-  minifi::state::response::SerializedResponseNode queued = resp.children.at(2);
-
-  REQUIRE("queued" == queued.name);
-  REQUIRE("0" == queued.value.to_string());
-
-  minifi::state::response::SerializedResponseNode queuedmax = 
resp.children.at(3);
-
-  REQUIRE("queuedmax" == queuedmax.name);
-  REQUIRE("1024" == queuedmax.value.to_string());
+  checkSerializedValue(resp.children, "datasize", "0");
+  checkSerializedValue(resp.children, "datasizemax", "1024");
+  checkSerializedValue(resp.children, "queued", "0");
+  checkSerializedValue(resp.children, "queuedmax", "1024");
 }
 
 TEST_CASE("RepositorymetricsNoRepo", "[c2m4]") {
   minifi::state::response::RepositoryMetrics metrics;
 
   REQUIRE("RepositoryMetrics" == metrics.getName());
-
   REQUIRE(metrics.serialize().empty());
 }
 
@@ -109,23 +99,13 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
     REQUIRE("repo_name" == resp.name);
+    REQUIRE(5 == resp.children.size());
 
-    REQUIRE(3 == resp.children.size());
-
-    minifi::state::response::SerializedResponseNode running = 
resp.children.at(0);
-
-    REQUIRE("running" == running.name);
-    REQUIRE("false" == running.value.to_string());
-
-    minifi::state::response::SerializedResponseNode full = resp.children.at(1);
-
-    REQUIRE("full" == full.name);
-    REQUIRE("false" == full.value);
-
-    minifi::state::response::SerializedResponseNode size = resp.children.at(2);
-
-    REQUIRE("size" == size.name);
-    REQUIRE("0" == size.value);
+    checkSerializedValue(resp.children, "running", "false");
+    checkSerializedValue(resp.children, "full", "false");
+    checkSerializedValue(resp.children, "size", "0");
+    checkSerializedValue(resp.children, "maxSize", "0");
+    checkSerializedValue(resp.children, "entryCount", "0");
   }
 
   repo->start();
@@ -135,26 +115,16 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
     REQUIRE("repo_name" == resp.name);
+    REQUIRE(5 == resp.children.size());
 
-    REQUIRE(3 == resp.children.size());
-
-    minifi::state::response::SerializedResponseNode running = 
resp.children.at(0);
-
-    REQUIRE("running" == running.name);
-    REQUIRE("true" == running.value.to_string());
-
-    minifi::state::response::SerializedResponseNode full = resp.children.at(1);
-
-    REQUIRE("full" == full.name);
-    REQUIRE("false" == full.value);
-
-    minifi::state::response::SerializedResponseNode size = resp.children.at(2);
-
-    REQUIRE("size" == size.name);
-    REQUIRE("0" == size.value);
+    checkSerializedValue(resp.children, "running", "true");
+    checkSerializedValue(resp.children, "full", "false");
+    checkSerializedValue(resp.children, "size", "0");
+    checkSerializedValue(resp.children, "maxSize", "0");
+    checkSerializedValue(resp.children, "entryCount", "0");
   }
 
-  repo->setFull();
+  repo->stop();
 
   {
     REQUIRE(1 == metrics.serialize().size());
@@ -162,50 +132,54 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
     REQUIRE("repo_name" == resp.name);
+    REQUIRE(5 == resp.children.size());
 
-    REQUIRE(3 == resp.children.size());
-
-    minifi::state::response::SerializedResponseNode running = 
resp.children.at(0);
-
-    REQUIRE("running" == running.name);
-    REQUIRE("true" == running.value.to_string());
-
-    minifi::state::response::SerializedResponseNode full = resp.children.at(1);
-
-    REQUIRE("full" == full.name);
-    REQUIRE("true" == full.value.to_string());
+    checkSerializedValue(resp.children, "running", "false");
+    checkSerializedValue(resp.children, "full", "false");
+    checkSerializedValue(resp.children, "size", "0");
+    checkSerializedValue(resp.children, "maxSize", "0");
+    checkSerializedValue(resp.children, "entryCount", "0");
+  }
+}
 
-    minifi::state::response::SerializedResponseNode size = resp.children.at(2);
+TEST_CASE("VolatileRepositorymetricsCanBeFull", "[c2m4]") {
+  minifi::state::response::RepositoryMetrics metrics;
 
-    REQUIRE("size" == size.name);
-    REQUIRE("0" == size.value.to_string());
-  }
+  REQUIRE("RepositoryMetrics" == metrics.getName());
 
-  repo->stop();
+  auto repo = std::make_shared<TestVolatileRepository>();
 
+  metrics.addRepository(repo);
   {
     REQUIRE(1 == metrics.serialize().size());
 
     minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
     REQUIRE("repo_name" == resp.name);
+    REQUIRE(5 == resp.children.size());
 
-    REQUIRE(3 == resp.children.size());
-
-    minifi::state::response::SerializedResponseNode running = 
resp.children.at(0);
+    checkSerializedValue(resp.children, "running", "false");
+    checkSerializedValue(resp.children, "full", "false");
+    checkSerializedValue(resp.children, "size", "0");
+    checkSerializedValue(resp.children, "maxSize", 
std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75)));
+    checkSerializedValue(resp.children, "entryCount", "0");
+  }
 
-    REQUIRE("running" == running.name);
-    REQUIRE("false" == running.value.to_string());
+  repo->setFull();
 
-    minifi::state::response::SerializedResponseNode full = resp.children.at(1);
+  {
+    REQUIRE(1 == metrics.serialize().size());
 
-    REQUIRE("full" == full.name);
-    REQUIRE("true" == full.value);
+    minifi::state::response::SerializedResponseNode resp = 
metrics.serialize().at(0);
 
-    minifi::state::response::SerializedResponseNode size = resp.children.at(2);
+    REQUIRE("repo_name" == resp.name);
+    REQUIRE(5 == resp.children.size());
 
-    REQUIRE("size" == size.name);
-    REQUIRE("0" == size.value);
+    checkSerializedValue(resp.children, "running", "false");
+    checkSerializedValue(resp.children, "full", "true");
+    checkSerializedValue(resp.children, "size", 
std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75)));
+    checkSerializedValue(resp.children, "maxSize", 
std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75)));
+    checkSerializedValue(resp.children, "entryCount", "10000");
   }
 }
 
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index f2d04ed70..3f5a16c72 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -28,6 +28,7 @@
 #include <utility>
 #include <vector>
 #include "core/repository/VolatileContentRepository.h"
+#include "core/repository/VolatileFlowFileRepository.h"
 #include "core/Processor.h"
 #include "core/ThreadedRepository.h"
 #include "Connection.h"
@@ -39,21 +40,19 @@
 
 using namespace std::literals::chrono_literals;
 
+const int64_t TEST_MAX_REPOSITORY_STORAGE_SIZE = 100;
+
 template <typename T_BaseRepository>
 class TestRepositoryBase : public T_BaseRepository {
  public:
   TestRepositoryBase()
-    : T_BaseRepository("repo_name", "./dir", 1s, 100, 0ms) {
+    : T_BaseRepository("repo_name", "./dir", 1s, 
TEST_MAX_REPOSITORY_STORAGE_SIZE, 0ms) {
   }
 
   bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> 
&) override {
     return true;
   }
 
-  void setFull() {
-    T_BaseRepository::repo_full_ = true;
-  }
-
   bool isNoop() const override {
     return false;
   }
@@ -125,6 +124,30 @@ class TestRepository : public 
TestRepositoryBase<org::apache::nifi::minifi::core
   bool stop() override {
     return true;
   }
+
+  uint64_t getRepositorySize() const override {
+    return 0;
+  }
+
+  uint64_t getRepositoryEntryCount() const override {
+    return 0;
+  }
+};
+
+class TestVolatileRepository : public 
TestRepositoryBase<org::apache::nifi::minifi::core::repository::VolatileFlowFileRepository>
 {
+ public:
+  bool start() override {
+    return true;
+  }
+
+  bool stop() override {
+    return true;
+  }
+
+  void setFull() {
+    repo_data_.current_size = repo_data_.max_size;
+    repo_data_.current_entry_count = repo_data_.max_count;
+  }
 };
 
 class TestThreadedRepository : public 
TestRepositoryBase<org::apache::nifi::minifi::core::ThreadedRepository> {
@@ -142,13 +165,21 @@ class TestThreadedRepository : public 
TestRepositoryBase<org::apache::nifi::mini
     return thread_;
   }
 
+  uint64_t getRepositorySize() const override {
+    return 0;
+  }
+
+  uint64_t getRepositoryEntryCount() const override {
+    return 0;
+  }
+
   std::thread thread_;
 };
 
 class TestFlowRepository : public 
org::apache::nifi::minifi::core::ThreadedRepository {
  public:
   TestFlowRepository()
-    : org::apache::nifi::minifi::core::ThreadedRepository("ff", "./dir", 1s, 
100, 0ms) {
+    : org::apache::nifi::minifi::core::ThreadedRepository("ff", "./dir", 1s, 
TEST_MAX_REPOSITORY_STORAGE_SIZE, 0ms) {
   }
 
   bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> 
&) override {
@@ -178,6 +209,14 @@ class TestFlowRepository : public 
org::apache::nifi::minifi::core::ThreadedRepos
     }
   }
 
+  uint64_t getRepositorySize() const override {
+    return 0;
+  }
+
+  uint64_t getRepositoryEntryCount() const override {
+    return 0;
+  }
+
  private:
   void run() override {
   }
diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp 
b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
index 1c5d41ea1..f0926523e 100644
--- a/libminifi/test/unit/ResponseNodeLoaderTests.cpp
+++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
@@ -36,7 +36,7 @@ class ResponseNodeLoaderTestFixture {
       prov_repo_(std::make_shared<TestRepository>()),
       ff_repository_(std::make_shared<TestRepository>()),
       
content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
-      response_node_loader_(configuration_, prov_repo_, ff_repository_, 
nullptr) {
+      response_node_loader_(configuration_, {prov_repo_, ff_repository_, 
content_repo_}, nullptr) {
     ff_repository_->initialize(configuration_);
     content_repo_->initialize(configuration_);
     auto uuid1 = 
addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index 29476495c..1c97cb757 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -371,7 +371,8 @@ int main(int argc, char **argv) {
           .path = 
configure->get(minifi::Configure::nifi_flow_configuration_file),
           .filesystem = filesystem}, nifi_configuration_class_name);
 
-    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configure, prov_repo, 
flow_repo, flow_configuration);
+    std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repo_metric_sources{prov_repo, flow_repo, content_repo};
+    auto metrics_publisher_store = 
std::make_unique<minifi::state::MetricsPublisherStore>(configure, 
repo_metric_sources, flow_configuration);
 
     const auto controller = std::make_unique<minifi::FlowController>(
       prov_repo, flow_repo, configure, std::move(flow_configuration), 
content_repo, std::move(metrics_publisher_store), filesystem, request_restart);

Reply via email to