This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 260c229fe3b64265bff269deac0311407ec4e183
Author: Martin Zink <martin.z...@protonmail.com>
AuthorDate: Thu Mar 14 14:07:34 2024 +0100

    MINIFICPP-2306 Filter out corrupt flowfiles during startup
    
    Closes #1738
    
    Signed-off-by: Marton Szasz <sza...@apache.org>
---
 CONFIGURE.md                                       |   4 +
 conf/minifi.properties                             |   1 +
 extensions/rocksdb-repos/FlowFileRepository.cpp    |  76 +++++++----
 extensions/rocksdb-repos/FlowFileRepository.h      |   7 +
 extensions/rocksdb-repos/tests/RepoTests.cpp       | 145 +++++++++++++++------
 libminifi/include/FlowFileRecord.h                 |   2 +-
 libminifi/include/core/StreamManager.h             |   7 +-
 .../include/core/repository/FileSystemRepository.h |  33 +++--
 libminifi/include/properties/Configuration.h       |   2 +
 .../src/core/repository/FileSystemRepository.cpp   |  37 ++++--
 10 files changed, 216 insertions(+), 98 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index 2df8d695a..d057bc760 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -362,6 +362,10 @@ The Content Repository can be configured with the 
`nifi.content.repository.class
     # in minifi.properties
     nifi.content.repository.class.name=FileSystemRepository
 
+During startup, MiNiFi checks if the flowfiles and their respective content 
are in good health (corruption can rarely occur due to ungraceful shutdowns) 
and filters out these corrupt flowfiles.
+This can slow down startup if there is a significant number of flowfiles. This 
health check can be disabled by setting `nifi.flowfile.repository.check.health` 
to `false`
+
+
 The Provenance Repository can be configured with the 
`nifi.provenance.repository.class.name` property. If not specified, it uses the 
`ProvenanceRepository` class by default, which persists the provenance events 
in a RocksDB database. Alternatively it can be configured to use a 
`VolatileProvenanceRepository` that keeps the state in memory (so the state 
gets lost upon restart), or the `NoOpRepository` to not keep track of the 
provenance events. By default we do not keep track of the proven [...]
 
     # in minifi.properties
diff --git a/conf/minifi.properties b/conf/minifi.properties
index d2d679f35..9ba8a0a08 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -29,6 +29,7 @@ nifi.provenance.repository.max.storage.time=1 MIN
 nifi.provenance.repository.max.storage.size=1 MB
 nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
 # nifi.flowfile.repository.rocksdb.compression=auto
+# nifi.flowfile.repository.check.health=true
 
nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
 nifi.provenance.repository.class.name=NoOpRepository
 nifi.content.repository.class.name=DatabaseContentRepository
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index ca81c795a..abdc2fcf0 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -25,7 +25,6 @@
 #include <vector>
 
 #include "rocksdb/options.h"
-#include "rocksdb/write_batch.h"
 #include "rocksdb/slice.h"
 #include "FlowFileRecord.h"
 #include "utils/gsl.h"
@@ -119,6 +118,22 @@ void FlowFileRepository::run() {
   flush();
 }
 
+bool FlowFileRepository::contentSizeIsAmpleForFlowFile(const FlowFileRecord& 
flow_file_record, const std::shared_ptr<ResourceClaim>& resource_claim) const {
+  const auto stream_size = resource_claim ? 
content_repo_->size(*resource_claim) : 0;
+  const auto required_size = flow_file_record.getOffset() + 
flow_file_record.getSize();
+  return stream_size >= required_size;
+}
+
+Connectable* FlowFileRepository::getContainer(const std::string& container_id) 
{
+  auto container = containers_.find(container_id);
+  if (container != containers_.end())
+    return container->second;
+  // for backward compatibility
+  container = connection_map_.find(container_id);
+  if (container != connection_map_.end())
+    return container->second;
+  return nullptr;
+}
 void FlowFileRepository::initialize_repository() {
   auto opendb = db_->open();
   if (!opendb) {
@@ -127,37 +142,37 @@ void FlowFileRepository::initialize_repository() {
   }
   logger_->log_info("Reading existing flow files from database");
 
-  auto it = opendb->NewIterator(rocksdb::ReadOptions());
+  const auto it = opendb->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     utils::Identifier container_id;
     auto eventRead = 
FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const 
std::byte>(), content_repo_, container_id);
-    std::string key = it->key().ToString();
-    if (eventRead) {
-      // on behalf of the just resurrected persisted instance
-      auto claim = eventRead->getResourceClaim();
-      if (claim) claim->increaseFlowFileRecordOwnedCount();
-      bool found = false;
-      auto search = containers_.find(container_id.to_string());
-      found = (search != containers_.end());
-      if (!found) {
-        // for backward compatibility
-        search = connection_map_.find(container_id.to_string());
-        found = (search != connection_map_.end());
-      }
-      if (found) {
-        logger_->log_debug("Found connection for {}, path {} ", 
container_id.to_string(), eventRead->getContentFullPath());
-        eventRead->setStoredToRepository(true);
-        // we found the connection for the persistent flowFile
-        // even if a processor immediately marks it for deletion, flush only 
happens after prune_stored_flowfiles
-        search->second->restore(eventRead);
-      } else {
-        logger_->log_warn("Could not find connection for {}, path {} ", 
container_id.to_string(), eventRead->getContentFullPath());
-        keys_to_delete_.enqueue({.key = key, .content = 
eventRead->getResourceClaim()});
-      }
-    } else {
+    const std::string key = it->key().ToString();
+    if (!eventRead) {
       // failed to deserialize FlowFile, cannot clear claim
       keys_to_delete_.enqueue({.key = key});
+      continue;
+    }
+    auto claim = eventRead->getResourceClaim();
+    if (claim) {
+      claim->increaseFlowFileRecordOwnedCount();
     }
+    const auto container = getContainer(container_id.to_string());
+    if (!container) {
+      logger_->log_warn("Could not find connection for %s, path %s ", 
container_id.to_string(), eventRead->getContentFullPath());
+      keys_to_delete_.enqueue({.key = key, .content = 
eventRead->getResourceClaim()});
+      continue;
+    }
+    if (check_flowfile_content_size_ && 
!contentSizeIsAmpleForFlowFile(*eventRead, claim)) {
+      logger_->log_warn("Content is missing or too small for flowfile {}", 
eventRead->getContentFullPath());
+      keys_to_delete_.enqueue({.key = key, .content = 
eventRead->getResourceClaim()});
+      continue;
+    }
+
+    logger_->log_debug("Found connection for %s, path %s ", 
container_id.to_string(), eventRead->getContentFullPath());
+    eventRead->setStoredToRepository(true);
+    // we found the connection for the persistent flowFile
+    // even if a processor immediately marks it for deletion, flush only 
happens after prune_stored_flowfiles
+    container->restore(eventRead);
   }
   flush();
   content_repo_->clearOrphans();
@@ -170,6 +185,14 @@ void FlowFileRepository::loadComponent(const 
std::shared_ptr<core::ContentReposi
   initialize_repository();
 }
 
+namespace {
+bool getRepositoryCheckHealth(const Configure& configure) {
+  std::string check_health_str;
+  configure.get(Configure::nifi_flow_file_repository_check_health, 
check_health_str);
+  return utils::string::toBool(check_health_str).value_or(true);
+}
+}  // namespace
+
 bool FlowFileRepository::initialize(const std::shared_ptr<Configure> 
&configure) {
   config_ = configure;
   std::string value;
@@ -177,6 +200,7 @@ bool FlowFileRepository::initialize(const 
std::shared_ptr<Configure> &configure)
   if (configure->get(Configure::nifi_flowfile_repository_directory_default, 
value) && !value.empty()) {
     directory_ = value;
   }
+  check_flowfile_content_size_ = getRepositoryCheckHealth(*configure);
   logger_->log_debug("NiFi FlowFile Repository Directory {}", directory_);
 
   setCompactionPeriod(configure);
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h 
b/extensions/rocksdb-repos/FlowFileRepository.h
index d22ea0de1..7e9b071b1 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -41,6 +41,9 @@
 #include "utils/StoppableThread.h"
 #include "RocksDbRepository.h"
 
+namespace org::apache::nifi::minifi {
+class FlowFileRecord;
+}
 namespace org::apache::nifi::minifi::core::repository {
 
 #ifdef WIN32
@@ -114,6 +117,9 @@ class FlowFileRepository : public RocksDbRepository, public 
SwapManager {
 
   void deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& 
opendb, std::list<ExpiredFlowFileInfo>& flow_files);
 
+  bool contentSizeIsAmpleForFlowFile(const FlowFileRecord& flow_file_record, 
const std::shared_ptr<ResourceClaim>& resource_claim) const;
+  Connectable* getContainer(const std::string& container_id);
+
   moodycamel::ConcurrentQueue<ExpiredFlowFileInfo> keys_to_delete_;
   std::shared_ptr<core::ContentRepository> content_repo_;
   std::unique_ptr<FlowFileLoader> swap_loader_;
@@ -121,6 +127,7 @@ class FlowFileRepository : public RocksDbRepository, public 
SwapManager {
 
   std::chrono::milliseconds compaction_period_;
   std::unique_ptr<utils::StoppableThread> compaction_thread_;
+  bool check_flowfile_content_size_ = true;
 };
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/tests/RepoTests.cpp 
b/extensions/rocksdb-repos/tests/RepoTests.cpp
index a253a89bc..fedd42e15 100644
--- a/extensions/rocksdb-repos/tests/RepoTests.cpp
+++ b/extensions/rocksdb-repos/tests/RepoTests.cpp
@@ -18,7 +18,6 @@
 
 #include <chrono>
 #include <map>
-#include <memory>
 #include <string>
 #include <thread>
 #include <optional>
@@ -39,13 +38,14 @@
 #include "core/repository/VolatileFlowFileRepository.h"
 #include "core/repository/VolatileProvenanceRepository.h"
 #include "DatabaseContentRepository.h"
+#include "catch2/generators/catch_generators.hpp"
 
 using namespace std::literals::chrono_literals;
 
 namespace {
 
 namespace {
-class TestProcessor : public minifi::core::Processor {
+class TestProcessor final : public core::Processor {
  public:
   using Processor::Processor;
 
@@ -58,10 +58,10 @@ class TestProcessor : public minifi::core::Processor {
 }  // namespace
 
 TEST_CASE("Test Repo Names", "[TestFFR1]") {
-  auto repoA = minifi::core::createRepository("FlowFileRepository", 
"flowfile");
+  const auto repoA = minifi::core::createRepository("FlowFileRepository", 
"flowfile");
   REQUIRE("flowfile" == repoA->getName());
 
-  auto repoB = minifi::core::createRepository("ProvenanceRepository", 
"provenance");
+  const auto repoB = minifi::core::createRepository("ProvenanceRepository", 
"provenance");
   REQUIRE("provenance" == repoB->getName());
 }
 
@@ -70,15 +70,15 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
   
LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
   
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
-  auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
+  const auto dir = testController.createTempDirectory();
+  const auto repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
 
   const auto configuration = std::make_shared<minifi::Configure>();
   configuration->setHome(dir);
   repository->initialize(configuration);
 
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  auto file = std::make_shared<minifi::FlowFileRecord>();
+  const auto file = std::make_shared<minifi::FlowFileRecord>();
 
   file->addAttribute("keyA", "");
 
@@ -92,14 +92,14 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
   
LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
   
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
-  auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
+  const auto dir = testController.createTempDirectory();
+  const auto repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
 
   const auto configuration = std::make_shared<minifi::Configure>();
   configuration->setHome(dir);
   repository->initialize(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  auto file = std::make_shared<minifi::FlowFileRecord>();
+  const auto file = std::make_shared<minifi::FlowFileRecord>();
 
   file->addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
 
@@ -116,7 +116,7 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
   
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
   auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
+  const auto repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
 
   const auto configuration = std::make_shared<minifi::Configure>();
   configuration->setHome(dir);
@@ -166,7 +166,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
 
   auto dir = testController.createTempDirectory();
 
-  std::shared_ptr<core::repository::FlowFileRepository> repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
+  const auto repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
 
   std::fstream file;
   file.open(dir / "tstFile.ext", std::ios::out);
@@ -183,7 +183,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
 
 
   {
-    std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>((dir / "tstFile.ext").string(), 
content_repo);
+    const auto claim = std::make_shared<minifi::ResourceClaim>((dir / 
"tstFile.ext").string(), content_repo);
     minifi::FlowFileRecord record;
     record.setResourceClaim(claim);
 
@@ -217,7 +217,7 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
 
   auto dir = testController.createTempDirectory();
 
-  std::shared_ptr<core::repository::FlowFileRepository> repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
+  const auto repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
 
   std::fstream file;
   file.open(dir / "tstFile.ext", std::ios::out);
@@ -232,7 +232,8 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
 
   repository->loadComponent(content_repo);
 
-  std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>((dir / "tstFile.ext").string(), 
content_repo);
+  auto claim = std::make_shared<minifi::ResourceClaim>((dir / 
"tstFile.ext").string(), content_repo);
+
   {
     minifi::FlowFileRecord record;
     record.setResourceClaim(claim);
@@ -306,9 +307,6 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
   });
   auto flowController = std::make_shared<minifi::FlowController>(prov_repo, 
ff_repository, config, std::move(flowConfig), content_repo);
 
-  std::string data = "banana";
-  minifi::io::BufferStream content(data);
-
   /**
    * Currently it is the Connection's responsibility to persist the incoming
    * flowFiles to the FlowFileRepository. Upon restart the FlowFileRepository
@@ -317,6 +315,8 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
    * which case the orphan FlowFiles are deleted.)
    */
   {
+    std::string data = "banana";
+    minifi::io::BufferStream content(data);
     std::shared_ptr<core::Processor> processor = 
std::make_shared<TestProcessor>("dummy");
     utils::Identifier uuid = processor->getUUID();
     REQUIRE(uuid);
@@ -345,7 +345,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
   // this will first check the persisted repo and restore all FlowFiles
   // that still has an owner Connectable
   ff_repository->start();
-  LogTestController::getInstance().contains("Found connection for");
+  CHECK(LogTestController::getInstance().contains("Found connection for"));
 
   // check if the @input Connection's FlowFile was restored
   // upon the FlowFileRepository's startup
@@ -379,13 +379,13 @@ TEST_CASE("Flush deleted flowfiles before shutdown", 
"[TestFFR7]") {
   };
 
   TestController testController;
-  auto dir = testController.createTempDirectory();
+  const auto dir = testController.createTempDirectory();
 
-  auto config = std::make_shared<minifi::Configure>();
+  const auto config = std::make_shared<minifi::Configure>();
   config->setHome(dir);
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
(dir / "flowfile_repository").string());
 
-  auto content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
+  const auto content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
 
   auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, 
"Connection");
   std::map<std::string, core::Connectable*> 
connectionMap{{connection->getUUIDStr(), connection.get()}};
@@ -395,7 +395,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", 
"[TestFFR7]") {
     int flush_counter{0};
     std::atomic<bool> stop{false};
 
-    std::shared_ptr<TestFlowFileRepository> ff_repository = 
std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    const auto ff_repository = 
std::make_shared<TestFlowFileRepository>("flowFileRepository");
 
     std::thread shutdown{[&] {
       while (!stop.load()) {
@@ -406,7 +406,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", 
"[TestFFR7]") {
 
     ff_repository->onFlush_ = [&] {
       {
-        std::lock_guard<std::mutex> lock(flush_counter_mutex);
+        std::lock_guard lock(flush_counter_mutex);
         if (++flush_counter != 1) {
           return;
         }
@@ -439,7 +439,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", 
"[TestFFR7]") {
 
   // check if the deleted flowfiles are indeed deleted
   {
-    std::shared_ptr<TestFlowFileRepository> ff_repository = 
std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    const auto ff_repository = 
std::make_shared<TestFlowFileRepository>("flowFileRepository");
     ff_repository->setConnectionMap(connectionMap);
     REQUIRE(ff_repository->initialize(config));
     ff_repository->loadComponent(content_repo);
@@ -490,18 +490,18 @@ TEST_CASE("FlowFileRepository synchronously pushes 
existing flow files") {
   
LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
   
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
-  auto home_dir = testController.createTempDirectory();
-  auto ff_dir = testController.createTempDirectory();
-  auto content_dir = testController.createTempDirectory();
+  const auto home_dir = testController.createTempDirectory();
+  const auto ff_dir = testController.createTempDirectory();
+  const auto content_dir = testController.createTempDirectory();
 
-  auto config = std::make_shared<minifi::Configure>();
+  const auto config = std::make_shared<minifi::Configure>();
   config->setHome(home_dir);
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
ff_dir.string());
   config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
content_dir.string());
 
 
   utils::Identifier ff_id;
-  auto connection_id = utils::IdGenerator::getIdGenerator()->generate();
+  const auto connection_id = utils::IdGenerator::getIdGenerator()->generate();
 
   {
     auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
@@ -621,15 +621,15 @@ TEST_CASE("Test getting flow file repository size 
properties", "[TestGettingRepo
 
 TEST_CASE("Test getting noop repository size properties", 
"[TestGettingRepositorySize]") {
   TestController testController;
-  auto dir = testController.createTempDirectory();
+  const auto dir = testController.createTempDirectory();
 
-  auto repository = minifi::core::createRepository("NoOpRepository", "ff");
+  const auto repository = minifi::core::createRepository("NoOpRepository", 
"ff");
 
   const auto configuration = std::make_shared<minifi::Configure>();
   configuration->setHome(dir);
   repository->initialize(configuration);
 
-  auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+  const auto flow_file = std::make_shared<minifi::FlowFileRecord>();
 
   flow_file->addAttribute("key", "testattributevalue");
 
@@ -648,12 +648,12 @@ TEST_CASE("Test getting content repository size 
properties", "[TestGettingReposi
   
LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
   
LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>();
   TestController testController;
-  auto dir = testController.createTempDirectory();
+  const auto dir = testController.createTempDirectory();
 
-  auto repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
+  const auto repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 
0, 1ms);
 
-  auto content_repo_dir = testController.createTempDirectory();
-  auto configuration = std::make_shared<minifi::Configure>();
+  const auto content_repo_dir = testController.createTempDirectory();
+  const auto configuration = std::make_shared<minifi::Configure>();
   configuration->setHome(dir);
   
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
 content_repo_dir.string());
   std::string content = "content";
@@ -723,7 +723,7 @@ TEST_CASE("Flow file repositories can be stopped", 
"[TestRepoIsRunning]") {
   
LogTestController::getInstance().setDebug<core::repository::VolatileFlowFileRepository>();
   
LogTestController::getInstance().setDebug<core::repository::VolatileProvenanceRepository>();
   TestController testController;
-  auto dir = testController.createTempDirectory();
+  const auto dir = testController.createTempDirectory();
 
   std::shared_ptr<core::Repository> repository;
   SECTION("FlowFileRepository") {
@@ -743,7 +743,7 @@ TEST_CASE("Flow file repositories can be stopped", 
"[TestRepoIsRunning]") {
   }
 
   SECTION("NoOpRepository") {
-    repository = minifi::core::createRepository("NoOpRepository", "ff");
+    repository = core::createRepository("NoOpRepository", "ff");
   }
 
   const auto configuration = std::make_shared<minifi::Configure>();
@@ -763,7 +763,7 @@ TEST_CASE("Content repositories are always running", 
"[TestRepoIsRunning]") {
   
LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
   
LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>();
   TestController testController;
-  auto dir = testController.createTempDirectory();
+  const auto dir = testController.createTempDirectory();
 
   std::shared_ptr<core::ContentRepository> content_repo;
   SECTION("FileSystemRepository") {
@@ -781,4 +781,69 @@ TEST_CASE("Content repositories are always running", 
"[TestRepoIsRunning]") {
   REQUIRE(content_repo->isRunning());
 }
 
+std::shared_ptr<minifi::FlowFileRecord> 
createFlowFileWithContent(core::ContentRepository& content_repo, 
std::string_view content) {
+  auto flow_file = std::make_shared<minifi::FlowFileRecord>();
+  const auto content_session = content_repo.createSession();
+  const auto claim = content_session->create();
+  const auto stream = content_session->write(claim);
+
+  stream->write(utils::as_span<const std::byte>(std::span(content)));
+  flow_file->setResourceClaim(claim);
+  flow_file->setSize(stream->size());
+  flow_file->setOffset(0);
+  stream->close();
+  content_session->commit();
+  return flow_file;
+}
+
+void corruptFlowFile(core::FlowFile& ff) {
+  ff.setSize(ff.getSize()*2);
+}
+
+TEST_CASE("FlowFileRepository can filter out too small contents") {
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+  TestController testController;
+  const auto ff_dir = testController.createTempDirectory();
+  const auto content_dir = testController.createTempDirectory();
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, 
ff_dir.string());
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
content_dir.string());
+
+  const auto [check_health, expected_flowfiles] = GENERATE(
+      std::make_tuple("false", size_t{2}),
+      std::make_tuple("true", size_t{1}));
+  config->set(minifi::Configure::nifi_flow_file_repository_check_health, 
check_health);
+  const auto content_repo = GENERATE(
+      
static_cast<std::shared_ptr<core::ContentRepository>>(std::make_shared<core::repository::FileSystemRepository>()),
+      
static_cast<std::shared_ptr<core::ContentRepository>>(std::make_shared<core::repository::DatabaseContentRepository>()));
+
+
+  auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+  REQUIRE(ff_repo->initialize(config));
+  REQUIRE(content_repo->initialize(config));
+  auto connection_id = utils::IdGenerator::getIdGenerator()->generate();
+  auto connection = std::make_shared<minifi::Connection>(ff_repo, 
content_repo, "TestConnection", connection_id);
+
+  auto first_flow_file = createFlowFileWithContent(*content_repo, "foo");
+  auto second_flow_file = createFlowFileWithContent(*content_repo, "bar");
+
+  std::map<std::string, core::Connectable*> 
connection_map{{connection->getUUIDStr(), connection.get()}};
+
+  first_flow_file->setConnection(connection.get());
+  first_flow_file->Persist(ff_repo);
+
+  second_flow_file->setConnection(connection.get());
+  corruptFlowFile(*second_flow_file);
+  second_flow_file->Persist(ff_repo);
+
+  ff_repo->setConnectionMap(connection_map);
+
+  CHECK(connection->getQueueSize() == 0);
+  ff_repo->loadComponent(content_repo);
+  CHECK(connection->getQueueSize() == expected_flowfiles);
+}
+
 }  // namespace
diff --git a/libminifi/include/FlowFileRecord.h 
b/libminifi/include/FlowFileRecord.h
index 928407e45..15d72b464 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -66,7 +66,7 @@ class FlowFileRecord : public core::FlowFile {
   static std::shared_ptr<FlowFileRecord> DeSerialize(const std::string& key, 
const std::shared_ptr<core::Repository>& flowRepository,
       const std::shared_ptr<core::ContentRepository> &content_repo, 
utils::Identifier &container);
 
-  std::string getContentFullPath() {
+  std::string getContentFullPath() const {
     return claim_ ? claim_->getContentFullPath() : "";
   }
 
diff --git a/libminifi/include/core/StreamManager.h 
b/libminifi/include/core/StreamManager.h
index 8c73169c7..52ff59e0b 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -67,7 +67,12 @@ class StreamManager {
    */
   virtual std::shared_ptr<io::BaseStream> read(const T &streamId) = 0;
 
-  virtual size_t size(const T &streamId) {return read(streamId)->size();}
+  virtual size_t size(const T &streamId) {
+    auto stream = read(streamId);
+    if (!stream)
+     return 0;
+    return stream->size();
+  }
 
   /**
    * Closes the stream
diff --git a/libminifi/include/core/repository/FileSystemRepository.h 
b/libminifi/include/core/repository/FileSystemRepository.h
index 160714101..d8c6b50e6 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -18,11 +18,8 @@
 
 #pragma once
 
-#include <memory>
 #include <string>
 #include <string_view>
-#include <utility>
-#include <algorithm>
 
 #include "../ContentRepository.h"
 #include "properties/Configure.h"
@@ -31,21 +28,25 @@
 
 namespace org::apache::nifi::minifi::core::repository {
 
-class FileSystemRepository : public core::ContentRepository {
+class FileSystemRepository : public ContentRepository {
  public:
-  explicit FileSystemRepository(std::string_view name = 
className<FileSystemRepository>())
-    : core::ContentRepository(name),
+  explicit FileSystemRepository(const std::string_view name = 
className<FileSystemRepository>())
+    : ContentRepository(name),
       logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) {
   }
 
+  FileSystemRepository(FileSystemRepository&&) = delete;
+  FileSystemRepository(const FileSystemRepository&) = delete;
+  FileSystemRepository& operator=(FileSystemRepository&&) = delete;
+  FileSystemRepository& operator=(const FileSystemRepository&) = delete;
   ~FileSystemRepository() override = default;
 
-  bool initialize(const std::shared_ptr<minifi::Configure>& configuration) 
override;
-  bool exists(const minifi::ResourceClaim& streamId) override;
-  std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& claim, 
bool append = false) override;
-  std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& claim) 
override;
+  bool initialize(const std::shared_ptr<Configure>& configuration) override;
+  bool exists(const ResourceClaim& streamId) override;
+  std::shared_ptr<io::BaseStream> write(const ResourceClaim& claim, bool 
append = false) override;
+  std::shared_ptr<io::BaseStream> read(const ResourceClaim& claim) override;
 
-  bool close(const minifi::ResourceClaim& claim) override {
+  bool close(const ResourceClaim& claim) override {
     return remove(claim);
   }
 
@@ -57,13 +58,9 @@ class FileSystemRepository : public core::ContentRepository {
     return utils::file::path_size(directory_);
   }
 
-  uint64_t getRepositoryEntryCount() const override {
-    auto dir_it = std::filesystem::recursive_directory_iterator(directory_, 
std::filesystem::directory_options::skip_permission_denied);
-    return std::count_if(
-      std::filesystem::begin(dir_it),
-      std::filesystem::end(dir_it),
-      [](auto& entry) { return entry.is_regular_file(); });
-  }
+  size_t size(const ResourceClaim& claim) override;
+
+  uint64_t getRepositoryEntryCount() const override;
 
  protected:
   bool removeKey(const std::string& content_path) override;
diff --git a/libminifi/include/properties/Configuration.h 
b/libminifi/include/properties/Configuration.h
index e50bc3115..ab1c1b81e 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -199,6 +199,8 @@ class Configuration : public Properties {
   static constexpr const char *controller_socket_port = 
"controller.socket.port";
   static constexpr const char *controller_ssl_context_service = 
"controller.ssl.context.service";
 
+  static constexpr const char *nifi_flow_file_repository_check_health = 
"nifi.flowfile.repository.check.health";
+
   MINIFIAPI static const std::unordered_map<std::string_view, 
gsl::not_null<const core::PropertyValidator*>> CONFIGURATION_PROPERTIES;
   MINIFIAPI static const std::array<const char*, 2> 
DEFAULT_SENSITIVE_PROPERTIES;
 
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp 
b/libminifi/src/core/repository/FileSystemRepository.cpp
index cdbab8633..99c69f759 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -17,7 +17,6 @@
  */
 
 #include "core/repository/FileSystemRepository.h"
-#include <memory>
 #include <string>
 #include <filesystem>
 #include "io/FileStream.h"
@@ -26,9 +25,8 @@
 
 namespace org::apache::nifi::minifi::core::repository {
 
-bool FileSystemRepository::initialize(const 
std::shared_ptr<minifi::Configure>& configuration) {
-  std::string directory_str;
-  if 
(configuration->get(Configure::nifi_dbcontent_repository_directory_default, 
directory_str) && !directory_str.empty()) {
+bool FileSystemRepository::initialize(const std::shared_ptr<Configure>& 
configuration) {
+  if (std::string directory_str; 
configuration->get(Configure::nifi_dbcontent_repository_directory_default, 
directory_str) && !directory_str.empty()) {
     directory_ = directory_str;
   } else {
     directory_ = configuration->getHome().string();
@@ -37,23 +35,23 @@ bool FileSystemRepository::initialize(const 
std::shared_ptr<minifi::Configure>&
   return true;
 }
 
-std::shared_ptr<io::BaseStream> FileSystemRepository::write(const 
minifi::ResourceClaim& claim, bool append) {
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const 
ResourceClaim& claim, bool append) {
   return std::make_shared<io::FileStream>(claim.getContentFullPath(), append);
 }
 
-bool FileSystemRepository::exists(const minifi::ResourceClaim& streamId) {
-  std::ifstream file(streamId.getContentFullPath());
+bool FileSystemRepository::exists(const ResourceClaim& streamId) {
+  const std::ifstream file(streamId.getContentFullPath());
   return file.good();
 }
 
-std::shared_ptr<io::BaseStream> FileSystemRepository::read(const 
minifi::ResourceClaim& claim) {
+std::shared_ptr<io::BaseStream> FileSystemRepository::read(const 
ResourceClaim& claim) {
   return std::make_shared<io::FileStream>(claim.getContentFullPath(), 0, 
false);
 }
 
 bool FileSystemRepository::removeKey(const std::string& content_path) {
   logger_->log_debug("Deleting resource {}", content_path);
   std::error_code ec;
-  auto result = std::filesystem::exists(content_path, ec);
+  const auto result = std::filesystem::exists(content_path, ec);
   if (ec) {
     logger_->log_error("Deleting {} from content repository failed with the 
following error: {}", content_path, ec.message());
     return false;
@@ -79,14 +77,13 @@ void FileSystemRepository::clearOrphans() {
     auto path = directory_ +  "/" + filename.string();
     bool is_orphan = false;
     {
-      std::lock_guard<std::mutex> lock(count_map_mutex_);
+      std::lock_guard lock(count_map_mutex_);
       auto it = count_map_.find(path);
       is_orphan = it == count_map_.end() || it->second == 0;
     }
     if (is_orphan) {
       logger_->log_debug("Deleting orphan resource {}", path);
-      std::error_code ec;
-      if (!std::filesystem::remove(path, ec)) {
+      if (std::error_code ec; !std::filesystem::remove(path, ec)) {
         {
           std::lock_guard<std::mutex> lock(purge_list_mutex_);
           purge_list_.push_back(path);
@@ -98,4 +95,20 @@ void FileSystemRepository::clearOrphans() {
   }, logger_, false);
 }
 
+size_t FileSystemRepository::size(const ResourceClaim& claim) {
+  std::error_code ec;
+  const auto size = std::filesystem::file_size(claim.getContentFullPath(), ec);
+  if (ec)
+    return 0;
+  return size;
+}
+
+uint64_t FileSystemRepository::getRepositoryEntryCount() const {
+  auto dir_it = std::filesystem::recursive_directory_iterator(directory_, 
std::filesystem::directory_options::skip_permission_denied);
+  return std::count_if(
+    begin(dir_it),
+    end(dir_it),
+    [](auto& entry) { return entry.is_regular_file(); });
+}
+
 }  // namespace org::apache::nifi::minifi::core::repository


Reply via email to