This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 260c229fe3b64265bff269deac0311407ec4e183 Author: Martin Zink <martin.z...@protonmail.com> AuthorDate: Thu Mar 14 14:07:34 2024 +0100 MINIFICPP-2306 Filter out corrupt flowfiles during startup Closes #1738 Signed-off-by: Marton Szasz <sza...@apache.org> --- CONFIGURE.md | 4 + conf/minifi.properties | 1 + extensions/rocksdb-repos/FlowFileRepository.cpp | 76 +++++++---- extensions/rocksdb-repos/FlowFileRepository.h | 7 + extensions/rocksdb-repos/tests/RepoTests.cpp | 145 +++++++++++++++------ libminifi/include/FlowFileRecord.h | 2 +- libminifi/include/core/StreamManager.h | 7 +- .../include/core/repository/FileSystemRepository.h | 33 +++-- libminifi/include/properties/Configuration.h | 2 + .../src/core/repository/FileSystemRepository.cpp | 37 ++++-- 10 files changed, 216 insertions(+), 98 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index 2df8d695a..d057bc760 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -362,6 +362,10 @@ The Content Repository can be configured with the `nifi.content.repository.class # in minifi.properties nifi.content.repository.class.name=FileSystemRepository +During startup, MiNiFi checks if the flowfiles and their respective content are in good health (corruption can rarely occur due to ungraceful shutdowns) and filters out these corrupt flowfiles. +This can slow down startup if there is a significant number of flowfiles. This health check can be disabled by setting `nifi.flowfile.repository.check.health` to `false` + + The Provenance Repository can be configured with the `nifi.provenance.repository.class.name` property. If not specified, it uses the `ProvenanceRepository` class by default, which persists the provenance events in a RocksDB database. Alternatively it can be configured to use a `VolatileProvenanceRepository` that keeps the state in memory (so the state gets lost upon restart), or the `NoOpRepository` to not keep track of the provenance events. By default we do not keep track of the proven [...] # in minifi.properties diff --git a/conf/minifi.properties b/conf/minifi.properties index d2d679f35..9ba8a0a08 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -29,6 +29,7 @@ nifi.provenance.repository.max.storage.time=1 MIN nifi.provenance.repository.max.storage.size=1 MB nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository # nifi.flowfile.repository.rocksdb.compression=auto +# nifi.flowfile.repository.check.health=true nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository nifi.provenance.repository.class.name=NoOpRepository nifi.content.repository.class.name=DatabaseContentRepository diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index ca81c795a..abdc2fcf0 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -25,7 +25,6 @@ #include <vector> #include "rocksdb/options.h" -#include "rocksdb/write_batch.h" #include "rocksdb/slice.h" #include "FlowFileRecord.h" #include "utils/gsl.h" @@ -119,6 +118,22 @@ void FlowFileRepository::run() { flush(); } +bool FlowFileRepository::contentSizeIsAmpleForFlowFile(const FlowFileRecord& flow_file_record, const std::shared_ptr<ResourceClaim>& resource_claim) const { + const auto stream_size = resource_claim ? content_repo_->size(*resource_claim) : 0; + const auto required_size = flow_file_record.getOffset() + flow_file_record.getSize(); + return stream_size >= required_size; +} + +Connectable* FlowFileRepository::getContainer(const std::string& container_id) { + auto container = containers_.find(container_id); + if (container != containers_.end()) + return container->second; + // for backward compatibility + container = connection_map_.find(container_id); + if (container != connection_map_.end()) + return container->second; + return nullptr; +} void FlowFileRepository::initialize_repository() { auto opendb = db_->open(); if (!opendb) { @@ -127,37 +142,37 @@ void FlowFileRepository::initialize_repository() { } logger_->log_info("Reading existing flow files from database"); - auto it = opendb->NewIterator(rocksdb::ReadOptions()); + const auto it = opendb->NewIterator(rocksdb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { utils::Identifier container_id; auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>(), content_repo_, container_id); - std::string key = it->key().ToString(); - if (eventRead) { - // on behalf of the just resurrected persisted instance - auto claim = eventRead->getResourceClaim(); - if (claim) claim->increaseFlowFileRecordOwnedCount(); - bool found = false; - auto search = containers_.find(container_id.to_string()); - found = (search != containers_.end()); - if (!found) { - // for backward compatibility - search = connection_map_.find(container_id.to_string()); - found = (search != connection_map_.end()); - } - if (found) { - logger_->log_debug("Found connection for {}, path {} ", container_id.to_string(), eventRead->getContentFullPath()); - eventRead->setStoredToRepository(true); - // we found the connection for the persistent flowFile - // even if a processor immediately marks it for deletion, flush only happens after prune_stored_flowfiles - search->second->restore(eventRead); - } else { - logger_->log_warn("Could not find connection for {}, path {} ", container_id.to_string(), eventRead->getContentFullPath()); - keys_to_delete_.enqueue({.key = key, .content = eventRead->getResourceClaim()}); - } - } else { + const std::string key = it->key().ToString(); + if (!eventRead) { // failed to deserialize FlowFile, cannot clear claim keys_to_delete_.enqueue({.key = key}); + continue; + } + auto claim = eventRead->getResourceClaim(); + if (claim) { + claim->increaseFlowFileRecordOwnedCount(); } + const auto container = getContainer(container_id.to_string()); + if (!container) { + logger_->log_warn("Could not find connection for %s, path %s ", container_id.to_string(), eventRead->getContentFullPath()); + keys_to_delete_.enqueue({.key = key, .content = eventRead->getResourceClaim()}); + continue; + } + if (check_flowfile_content_size_ && !contentSizeIsAmpleForFlowFile(*eventRead, claim)) { + logger_->log_warn("Content is missing or too small for flowfile {}", eventRead->getContentFullPath()); + keys_to_delete_.enqueue({.key = key, .content = eventRead->getResourceClaim()}); + continue; + } + + logger_->log_debug("Found connection for %s, path %s ", container_id.to_string(), eventRead->getContentFullPath()); + eventRead->setStoredToRepository(true); + // we found the connection for the persistent flowFile + // even if a processor immediately marks it for deletion, flush only happens after prune_stored_flowfiles + container->restore(eventRead); } flush(); content_repo_->clearOrphans(); @@ -170,6 +185,14 @@ void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentReposi initialize_repository(); } +namespace { +bool getRepositoryCheckHealth(const Configure& configure) { + std::string check_health_str; + configure.get(Configure::nifi_flow_file_repository_check_health, check_health_str); + return utils::string::toBool(check_health_str).value_or(true); +} +} // namespace + bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) { config_ = configure; std::string value; @@ -177,6 +200,7 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) if (configure->get(Configure::nifi_flowfile_repository_directory_default, value) && !value.empty()) { directory_ = value; } + check_flowfile_content_size_ = getRepositoryCheckHealth(*configure); logger_->log_debug("NiFi FlowFile Repository Directory {}", directory_); setCompactionPeriod(configure); diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h index d22ea0de1..7e9b071b1 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.h +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -41,6 +41,9 @@ #include "utils/StoppableThread.h" #include "RocksDbRepository.h" +namespace org::apache::nifi::minifi { +class FlowFileRecord; +} namespace org::apache::nifi::minifi::core::repository { #ifdef WIN32 @@ -114,6 +117,9 @@ class FlowFileRepository : public RocksDbRepository, public SwapManager { void deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& opendb, std::list<ExpiredFlowFileInfo>& flow_files); + bool contentSizeIsAmpleForFlowFile(const FlowFileRecord& flow_file_record, const std::shared_ptr<ResourceClaim>& resource_claim) const; + Connectable* getContainer(const std::string& container_id); + moodycamel::ConcurrentQueue<ExpiredFlowFileInfo> keys_to_delete_; std::shared_ptr<core::ContentRepository> content_repo_; std::unique_ptr<FlowFileLoader> swap_loader_; @@ -121,6 +127,7 @@ class FlowFileRepository : public RocksDbRepository, public SwapManager { std::chrono::milliseconds compaction_period_; std::unique_ptr<utils::StoppableThread> compaction_thread_; + bool check_flowfile_content_size_ = true; }; } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/tests/RepoTests.cpp b/extensions/rocksdb-repos/tests/RepoTests.cpp index a253a89bc..fedd42e15 100644 --- a/extensions/rocksdb-repos/tests/RepoTests.cpp +++ b/extensions/rocksdb-repos/tests/RepoTests.cpp @@ -18,7 +18,6 @@ #include <chrono> #include <map> -#include <memory> #include <string> #include <thread> #include <optional> @@ -39,13 +38,14 @@ #include "core/repository/VolatileFlowFileRepository.h" #include "core/repository/VolatileProvenanceRepository.h" #include "DatabaseContentRepository.h" +#include "catch2/generators/catch_generators.hpp" using namespace std::literals::chrono_literals; namespace { namespace { -class TestProcessor : public minifi::core::Processor { +class TestProcessor final : public core::Processor { public: using Processor::Processor; @@ -58,10 +58,10 @@ class TestProcessor : public minifi::core::Processor { } // namespace TEST_CASE("Test Repo Names", "[TestFFR1]") { - auto repoA = minifi::core::createRepository("FlowFileRepository", "flowfile"); + const auto repoA = minifi::core::createRepository("FlowFileRepository", "flowfile"); REQUIRE("flowfile" == repoA->getName()); - auto repoB = minifi::core::createRepository("ProvenanceRepository", "provenance"); + const auto repoB = minifi::core::createRepository("ProvenanceRepository", "provenance"); REQUIRE("provenance" == repoB->getName()); } @@ -70,15 +70,15 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); TestController testController; - auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + const auto dir = testController.createTempDirectory(); + const auto repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); const auto configuration = std::make_shared<minifi::Configure>(); configuration->setHome(dir); repository->initialize(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - auto file = std::make_shared<minifi::FlowFileRecord>(); + const auto file = std::make_shared<minifi::FlowFileRecord>(); file->addAttribute("keyA", ""); @@ -92,14 +92,14 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); TestController testController; - auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + const auto dir = testController.createTempDirectory(); + const auto repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); const auto configuration = std::make_shared<minifi::Configure>(); configuration->setHome(dir); repository->initialize(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - auto file = std::make_shared<minifi::FlowFileRecord>(); + const auto file = std::make_shared<minifi::FlowFileRecord>(); file->addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); @@ -116,7 +116,7 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); TestController testController; auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + const auto repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); const auto configuration = std::make_shared<minifi::Configure>(); configuration->setHome(dir); @@ -166,7 +166,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") { auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + const auto repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); std::fstream file; file.open(dir / "tstFile.ext", std::ios::out); @@ -183,7 +183,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") { { - std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>((dir / "tstFile.ext").string(), content_repo); + const auto claim = std::make_shared<minifi::ResourceClaim>((dir / "tstFile.ext").string(), content_repo); minifi::FlowFileRecord record; record.setResourceClaim(claim); @@ -217,7 +217,7 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") { auto dir = testController.createTempDirectory(); - std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + const auto repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); std::fstream file; file.open(dir / "tstFile.ext", std::ios::out); @@ -232,7 +232,8 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") { repository->loadComponent(content_repo); - std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>((dir / "tstFile.ext").string(), content_repo); + auto claim = std::make_shared<minifi::ResourceClaim>((dir / "tstFile.ext").string(), content_repo); + { minifi::FlowFileRecord record; record.setResourceClaim(claim); @@ -306,9 +307,6 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { }); auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo); - std::string data = "banana"; - minifi::io::BufferStream content(data); - /** * Currently it is the Connection's responsibility to persist the incoming * flowFiles to the FlowFileRepository. Upon restart the FlowFileRepository @@ -317,6 +315,8 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { * which case the orphan FlowFiles are deleted.) */ { + std::string data = "banana"; + minifi::io::BufferStream content(data); std::shared_ptr<core::Processor> processor = std::make_shared<TestProcessor>("dummy"); utils::Identifier uuid = processor->getUUID(); REQUIRE(uuid); @@ -345,7 +345,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { // this will first check the persisted repo and restore all FlowFiles // that still has an owner Connectable ff_repository->start(); - LogTestController::getInstance().contains("Found connection for"); + CHECK(LogTestController::getInstance().contains("Found connection for")); // check if the @input Connection's FlowFile was restored // upon the FlowFileRepository's startup @@ -379,13 +379,13 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { }; TestController testController; - auto dir = testController.createTempDirectory(); + const auto dir = testController.createTempDirectory(); - auto config = std::make_shared<minifi::Configure>(); + const auto config = std::make_shared<minifi::Configure>(); config->setHome(dir); config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string()); - auto content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>(); auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection"); std::map<std::string, core::Connectable*> connectionMap{{connection->getUUIDStr(), connection.get()}}; @@ -395,7 +395,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { int flush_counter{0}; std::atomic<bool> stop{false}; - std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository"); + const auto ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository"); std::thread shutdown{[&] { while (!stop.load()) { @@ -406,7 +406,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { ff_repository->onFlush_ = [&] { { - std::lock_guard<std::mutex> lock(flush_counter_mutex); + std::lock_guard lock(flush_counter_mutex); if (++flush_counter != 1) { return; } @@ -439,7 +439,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") { // check if the deleted flowfiles are indeed deleted { - std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository"); + const auto ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository"); ff_repository->setConnectionMap(connectionMap); REQUIRE(ff_repository->initialize(config)); ff_repository->loadComponent(content_repo); @@ -490,18 +490,18 @@ TEST_CASE("FlowFileRepository synchronously pushes existing flow files") { LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); TestController testController; - auto home_dir = testController.createTempDirectory(); - auto ff_dir = testController.createTempDirectory(); - auto content_dir = testController.createTempDirectory(); + const auto home_dir = testController.createTempDirectory(); + const auto ff_dir = testController.createTempDirectory(); + const auto content_dir = testController.createTempDirectory(); - auto config = std::make_shared<minifi::Configure>(); + const auto config = std::make_shared<minifi::Configure>(); config->setHome(home_dir); config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); utils::Identifier ff_id; - auto connection_id = utils::IdGenerator::getIdGenerator()->generate(); + const auto connection_id = utils::IdGenerator::getIdGenerator()->generate(); { auto ff_repo = std::make_shared<core::repository::FlowFileRepository>(); @@ -621,15 +621,15 @@ TEST_CASE("Test getting flow file repository size properties", "[TestGettingRepo TEST_CASE("Test getting noop repository size properties", "[TestGettingRepositorySize]") { TestController testController; - auto dir = testController.createTempDirectory(); + const auto dir = testController.createTempDirectory(); - auto repository = minifi::core::createRepository("NoOpRepository", "ff"); + const auto repository = minifi::core::createRepository("NoOpRepository", "ff"); const auto configuration = std::make_shared<minifi::Configure>(); configuration->setHome(dir); repository->initialize(configuration); - auto flow_file = std::make_shared<minifi::FlowFileRecord>(); + const auto flow_file = std::make_shared<minifi::FlowFileRecord>(); flow_file->addAttribute("key", "testattributevalue"); @@ -648,12 +648,12 @@ TEST_CASE("Test getting content repository size properties", "[TestGettingReposi LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>(); LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>(); TestController testController; - auto dir = testController.createTempDirectory(); + const auto dir = testController.createTempDirectory(); - auto repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); + const auto repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms); - auto content_repo_dir = testController.createTempDirectory(); - auto configuration = std::make_shared<minifi::Configure>(); + const auto content_repo_dir = testController.createTempDirectory(); + const auto configuration = std::make_shared<minifi::Configure>(); configuration->setHome(dir); configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_repo_dir.string()); std::string content = "content"; @@ -723,7 +723,7 @@ TEST_CASE("Flow file repositories can be stopped", "[TestRepoIsRunning]") { LogTestController::getInstance().setDebug<core::repository::VolatileFlowFileRepository>(); LogTestController::getInstance().setDebug<core::repository::VolatileProvenanceRepository>(); TestController testController; - auto dir = testController.createTempDirectory(); + const auto dir = testController.createTempDirectory(); std::shared_ptr<core::Repository> repository; SECTION("FlowFileRepository") { @@ -743,7 +743,7 @@ TEST_CASE("Flow file repositories can be stopped", "[TestRepoIsRunning]") { } SECTION("NoOpRepository") { - repository = minifi::core::createRepository("NoOpRepository", "ff"); + repository = core::createRepository("NoOpRepository", "ff"); } const auto configuration = std::make_shared<minifi::Configure>(); @@ -763,7 +763,7 @@ TEST_CASE("Content repositories are always running", "[TestRepoIsRunning]") { LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>(); LogTestController::getInstance().setDebug<core::repository::DatabaseContentRepository>(); TestController testController; - auto dir = testController.createTempDirectory(); + const auto dir = testController.createTempDirectory(); std::shared_ptr<core::ContentRepository> content_repo; SECTION("FileSystemRepository") { @@ -781,4 +781,69 @@ TEST_CASE("Content repositories are always running", "[TestRepoIsRunning]") { REQUIRE(content_repo->isRunning()); } +std::shared_ptr<minifi::FlowFileRecord> createFlowFileWithContent(core::ContentRepository& content_repo, std::string_view content) { + auto flow_file = std::make_shared<minifi::FlowFileRecord>(); + const auto content_session = content_repo.createSession(); + const auto claim = content_session->create(); + const auto stream = content_session->write(claim); + + stream->write(utils::as_span<const std::byte>(std::span(content))); + flow_file->setResourceClaim(claim); + flow_file->setSize(stream->size()); + flow_file->setOffset(0); + stream->close(); + content_session->commit(); + return flow_file; +} + +void corruptFlowFile(core::FlowFile& ff) { + ff.setSize(ff.getSize()*2); +} + +TEST_CASE("FlowFileRepository can filter out too small contents") { + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); + TestController testController; + const auto ff_dir = testController.createTempDirectory(); + const auto content_dir = testController.createTempDirectory(); + + auto config = std::make_shared<minifi::Configure>(); + config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string()); + config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string()); + + const auto [check_health, expected_flowfiles] = GENERATE( + std::make_tuple("false", size_t{2}), + std::make_tuple("true", size_t{1})); + config->set(minifi::Configure::nifi_flow_file_repository_check_health, check_health); + const auto content_repo = GENERATE( + static_cast<std::shared_ptr<core::ContentRepository>>(std::make_shared<core::repository::FileSystemRepository>()), + static_cast<std::shared_ptr<core::ContentRepository>>(std::make_shared<core::repository::DatabaseContentRepository>())); + + + auto ff_repo = std::make_shared<core::repository::FlowFileRepository>(); + REQUIRE(ff_repo->initialize(config)); + REQUIRE(content_repo->initialize(config)); + auto connection_id = utils::IdGenerator::getIdGenerator()->generate(); + auto connection = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id); + + auto first_flow_file = createFlowFileWithContent(*content_repo, "foo"); + auto second_flow_file = createFlowFileWithContent(*content_repo, "bar"); + + std::map<std::string, core::Connectable*> connection_map{{connection->getUUIDStr(), connection.get()}}; + + first_flow_file->setConnection(connection.get()); + first_flow_file->Persist(ff_repo); + + second_flow_file->setConnection(connection.get()); + corruptFlowFile(*second_flow_file); + second_flow_file->Persist(ff_repo); + + ff_repo->setConnectionMap(connection_map); + + CHECK(connection->getQueueSize() == 0); + ff_repo->loadComponent(content_repo); + CHECK(connection->getQueueSize() == expected_flowfiles); +} + } // namespace diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h index 928407e45..15d72b464 100644 --- a/libminifi/include/FlowFileRecord.h +++ b/libminifi/include/FlowFileRecord.h @@ -66,7 +66,7 @@ class FlowFileRecord : public core::FlowFile { static std::shared_ptr<FlowFileRecord> DeSerialize(const std::string& key, const std::shared_ptr<core::Repository>& flowRepository, const std::shared_ptr<core::ContentRepository> &content_repo, utils::Identifier &container); - std::string getContentFullPath() { + std::string getContentFullPath() const { return claim_ ? claim_->getContentFullPath() : ""; } diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h index 8c73169c7..52ff59e0b 100644 --- a/libminifi/include/core/StreamManager.h +++ b/libminifi/include/core/StreamManager.h @@ -67,7 +67,12 @@ class StreamManager { */ virtual std::shared_ptr<io::BaseStream> read(const T &streamId) = 0; - virtual size_t size(const T &streamId) {return read(streamId)->size();} + virtual size_t size(const T &streamId) { + auto stream = read(streamId); + if (!stream) + return 0; + return stream->size(); + } /** * Closes the stream diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h index 160714101..d8c6b50e6 100644 --- a/libminifi/include/core/repository/FileSystemRepository.h +++ b/libminifi/include/core/repository/FileSystemRepository.h @@ -18,11 +18,8 @@ #pragma once -#include <memory> #include <string> #include <string_view> -#include <utility> -#include <algorithm> #include "../ContentRepository.h" #include "properties/Configure.h" @@ -31,21 +28,25 @@ namespace org::apache::nifi::minifi::core::repository { -class FileSystemRepository : public core::ContentRepository { +class FileSystemRepository : public ContentRepository { public: - explicit FileSystemRepository(std::string_view name = className<FileSystemRepository>()) - : core::ContentRepository(name), + explicit FileSystemRepository(const std::string_view name = className<FileSystemRepository>()) + : ContentRepository(name), logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) { } + FileSystemRepository(FileSystemRepository&&) = delete; + FileSystemRepository(const FileSystemRepository&) = delete; + FileSystemRepository& operator=(FileSystemRepository&&) = delete; + FileSystemRepository& operator=(const FileSystemRepository&) = delete; ~FileSystemRepository() override = default; - bool initialize(const std::shared_ptr<minifi::Configure>& configuration) override; - bool exists(const minifi::ResourceClaim& streamId) override; - std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& claim, bool append = false) override; - std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& claim) override; + bool initialize(const std::shared_ptr<Configure>& configuration) override; + bool exists(const ResourceClaim& streamId) override; + std::shared_ptr<io::BaseStream> write(const ResourceClaim& claim, bool append = false) override; + std::shared_ptr<io::BaseStream> read(const ResourceClaim& claim) override; - bool close(const minifi::ResourceClaim& claim) override { + bool close(const ResourceClaim& claim) override { return remove(claim); } @@ -57,13 +58,9 @@ class FileSystemRepository : public core::ContentRepository { return utils::file::path_size(directory_); } - uint64_t getRepositoryEntryCount() const override { - auto dir_it = std::filesystem::recursive_directory_iterator(directory_, std::filesystem::directory_options::skip_permission_denied); - return std::count_if( - std::filesystem::begin(dir_it), - std::filesystem::end(dir_it), - [](auto& entry) { return entry.is_regular_file(); }); - } + size_t size(const ResourceClaim& claim) override; + + uint64_t getRepositoryEntryCount() const override; protected: bool removeKey(const std::string& content_path) override; diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index e50bc3115..ab1c1b81e 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -199,6 +199,8 @@ class Configuration : public Properties { static constexpr const char *controller_socket_port = "controller.socket.port"; static constexpr const char *controller_ssl_context_service = "controller.ssl.context.service"; + static constexpr const char *nifi_flow_file_repository_check_health = "nifi.flowfile.repository.check.health"; + MINIFIAPI static const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyValidator*>> CONFIGURATION_PROPERTIES; MINIFIAPI static const std::array<const char*, 2> DEFAULT_SENSITIVE_PROPERTIES; diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp index cdbab8633..99c69f759 100644 --- a/libminifi/src/core/repository/FileSystemRepository.cpp +++ b/libminifi/src/core/repository/FileSystemRepository.cpp @@ -17,7 +17,6 @@ */ #include "core/repository/FileSystemRepository.h" -#include <memory> #include <string> #include <filesystem> #include "io/FileStream.h" @@ -26,9 +25,8 @@ namespace org::apache::nifi::minifi::core::repository { -bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure>& configuration) { - std::string directory_str; - if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, directory_str) && !directory_str.empty()) { +bool FileSystemRepository::initialize(const std::shared_ptr<Configure>& configuration) { + if (std::string directory_str; configuration->get(Configure::nifi_dbcontent_repository_directory_default, directory_str) && !directory_str.empty()) { directory_ = directory_str; } else { directory_ = configuration->getHome().string(); @@ -37,23 +35,23 @@ bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure>& return true; } -std::shared_ptr<io::BaseStream> FileSystemRepository::write(const minifi::ResourceClaim& claim, bool append) { +std::shared_ptr<io::BaseStream> FileSystemRepository::write(const ResourceClaim& claim, bool append) { return std::make_shared<io::FileStream>(claim.getContentFullPath(), append); } -bool FileSystemRepository::exists(const minifi::ResourceClaim& streamId) { - std::ifstream file(streamId.getContentFullPath()); +bool FileSystemRepository::exists(const ResourceClaim& streamId) { + const std::ifstream file(streamId.getContentFullPath()); return file.good(); } -std::shared_ptr<io::BaseStream> FileSystemRepository::read(const minifi::ResourceClaim& claim) { +std::shared_ptr<io::BaseStream> FileSystemRepository::read(const ResourceClaim& claim) { return std::make_shared<io::FileStream>(claim.getContentFullPath(), 0, false); } bool FileSystemRepository::removeKey(const std::string& content_path) { logger_->log_debug("Deleting resource {}", content_path); std::error_code ec; - auto result = std::filesystem::exists(content_path, ec); + const auto result = std::filesystem::exists(content_path, ec); if (ec) { logger_->log_error("Deleting {} from content repository failed with the following error: {}", content_path, ec.message()); return false; @@ -79,14 +77,13 @@ void FileSystemRepository::clearOrphans() { auto path = directory_ + "/" + filename.string(); bool is_orphan = false; { - std::lock_guard<std::mutex> lock(count_map_mutex_); + std::lock_guard lock(count_map_mutex_); auto it = count_map_.find(path); is_orphan = it == count_map_.end() || it->second == 0; } if (is_orphan) { logger_->log_debug("Deleting orphan resource {}", path); - std::error_code ec; - if (!std::filesystem::remove(path, ec)) { + if (std::error_code ec; !std::filesystem::remove(path, ec)) { { std::lock_guard<std::mutex> lock(purge_list_mutex_); purge_list_.push_back(path); @@ -98,4 +95,20 @@ void FileSystemRepository::clearOrphans() { }, logger_, false); } +size_t FileSystemRepository::size(const ResourceClaim& claim) { + std::error_code ec; + const auto size = std::filesystem::file_size(claim.getContentFullPath(), ec); + if (ec) + return 0; + return size; +} + +uint64_t FileSystemRepository::getRepositoryEntryCount() const { + auto dir_it = std::filesystem::recursive_directory_iterator(directory_, std::filesystem::directory_options::skip_permission_denied); + return std::count_if( + begin(dir_it), + end(dir_it), + [](auto& entry) { return entry.is_regular_file(); }); +} + } // namespace org::apache::nifi::minifi::core::repository