This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 4fb71dee2cf2401659df95d8a7f2a226c3ab2568 Author: Adam Debreceni <adebrec...@apache.org> AuthorDate: Wed Feb 8 07:36:23 2023 +0100 MINIFICPP-2040 - Store content claim for flow file deletion Signed-off-by: Gabor Gyimesi <gamezb...@gmail.com> This closes #1508 --- extensions/rocksdb-repos/FlowFileRepository.cpp | 106 +++++++++++++++--------- extensions/rocksdb-repos/FlowFileRepository.h | 11 ++- libminifi/include/core/Repository.h | 4 + 3 files changed, 81 insertions(+), 40 deletions(-) diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 7568201e3..36999c4e0 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -41,52 +41,71 @@ void FlowFileRepository::flush() { return; } auto batch = opendb->createWriteBatch(); - rocksdb::ReadOptions options; - std::vector<std::shared_ptr<FlowFile>> purgeList; + std::list<ExpiredFlowFileInfo> flow_files; - std::vector<rocksdb::Slice> keys; - std::list<std::string> keystrings; - std::vector<std::string> values; - - while (keys_to_delete.size_approx() > 0) { - std::string key; - if (keys_to_delete.try_dequeue(key)) { - keystrings.push_back(std::move(key)); // rocksdb::Slice doesn't copy the string, only grabs ptrs. Hacky, but have to ensure the required lifetime of the strings. - keys.push_back(keystrings.back()); + while (keys_to_delete_.size_approx() > 0) { + ExpiredFlowFileInfo info; + if (keys_to_delete_.try_dequeue(info)) { + flow_files.push_back(std::move(info)); } } - auto multistatus = opendb->MultiGet(options, keys, &values); - for (size_t i = 0; i < keys.size() && i < values.size() && i < multistatus.size(); ++i) { - if (!multistatus[i].ok()) { - logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data()); - keystrings.remove(keys[i].data()); - continue; - } + deserializeFlowFilesWithNoContentClaim(opendb.value(), flow_files); - utils::Identifier containerId; - auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const std::byte>(), content_repo_, containerId); - if (eventRead) { - purgeList.push_back(eventRead); - } - logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath()); - batch.Delete(keys[i]); + for (auto& ff : flow_files) { + batch.Delete(ff.key); + logger_->log_debug("Issuing batch delete, including %s, Content path %s", ff.key, ff.content ? ff.content->getContentFullPath() : "null"); } auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); }; if (!ExecuteWithRetry(operation)) { - for (const auto& key : keystrings) { - keys_to_delete.enqueue(key); // Push back the values that we could get but couldn't delete + for (auto&& ff : flow_files) { + keys_to_delete_.enqueue(std::move(ff)); } return; // Stop here - don't delete from content repo while we have records in FF repo } if (content_repo_) { - for (const auto &ffr : purgeList) { - auto claim = ffr->getResourceClaim(); - if (claim) claim->decreaseFlowFileRecordOwnedCount(); + for (auto& ff : flow_files) { + if (ff.content) { + ff.content->decreaseFlowFileRecordOwnedCount(); + } + } + } +} + +void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& opendb, std::list<ExpiredFlowFileInfo>& flow_files) { + std::vector<rocksdb::Slice> keys; + std::vector<std::list<ExpiredFlowFileInfo>::iterator> key_positions; + for (auto it = flow_files.begin(); it != flow_files.end(); ++it) { + if (!it->content) { + keys.push_back(it->key); + key_positions.push_back(it); + } + } + if (keys.empty()) { + return; + } + std::vector<std::string> values; + auto multistatus = opendb.MultiGet(rocksdb::ReadOptions{}, keys, &values); + gsl_Expects(keys.size() == values.size() && values.size() == multistatus.size()); + + for (size_t i = 0; i < keys.size(); ++i) { + if (!multistatus[i].ok()) { + logger_->log_error("Failed to read key from rocksdb: %s! DB is most probably in an inconsistent state!", keys[i].data()); + flow_files.erase(key_positions.at(i)); + continue; + } + + utils::Identifier container_id; + auto flow_file = FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const std::byte>(), content_repo_, container_id); + if (flow_file) { + gsl_Expects(flow_file->getUUIDStr() == key_positions.at(i)->key); + key_positions.at(i)->content = flow_file->getResourceClaim(); + } else { + logger_->log_error("Could not deserialize flow file %s", key_positions.at(i)->key); } } } @@ -149,34 +168,34 @@ void FlowFileRepository::initialize_repository() { auto it = opendb->NewIterator(rocksdb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { - utils::Identifier containerId; - auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>(), content_repo_, containerId); + utils::Identifier container_id; + auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>(), content_repo_, container_id); std::string key = it->key().ToString(); if (eventRead) { // on behalf of the just resurrected persisted instance auto claim = eventRead->getResourceClaim(); if (claim) claim->increaseFlowFileRecordOwnedCount(); bool found = false; - auto search = containers_.find(containerId.to_string()); + auto search = containers_.find(container_id.to_string()); found = (search != containers_.end()); if (!found) { // for backward compatibility - search = connection_map_.find(containerId.to_string()); + search = connection_map_.find(container_id.to_string()); found = (search != connection_map_.end()); } if (found) { - logger_->log_debug("Found connection for %s, path %s ", containerId.to_string(), eventRead->getContentFullPath()); + logger_->log_debug("Found connection for %s, path %s ", container_id.to_string(), eventRead->getContentFullPath()); eventRead->setStoredToRepository(true); // we found the connection for the persistent flowFile // even if a processor immediately marks it for deletion, flush only happens after prune_stored_flowfiles search->second->restore(eventRead); } else { - logger_->log_warn("Could not find connection for %s, path %s ", containerId.to_string(), eventRead->getContentFullPath()); - keys_to_delete.enqueue(key); + logger_->log_warn("Could not find connection for %s, path %s ", container_id.to_string(), eventRead->getContentFullPath()); + keys_to_delete_.enqueue({.key = key, .content = eventRead->getResourceClaim()}); } } else { // failed to deserialize FlowFile, cannot clear claim - keys_to_delete.enqueue(key); + keys_to_delete_.enqueue({.key = key}); } } flush(); @@ -287,7 +306,7 @@ bool FlowFileRepository::MultiPut(const std::vector<std::pair<std::string, std:: } bool FlowFileRepository::Delete(const std::string& key) { - keys_to_delete.enqueue(key); + keys_to_delete_.enqueue({.key = key}); return true; } @@ -340,6 +359,15 @@ std::future<std::vector<std::shared_ptr<core::FlowFile>>> FlowFileRepository::lo return swap_loader_->load(std::move(flow_files)); } +bool FlowFileRepository::Delete(const std::shared_ptr<core::CoreComponent>& item) { + if (auto ff = std::dynamic_pointer_cast<core::FlowFile>(item)) { + keys_to_delete_.enqueue({.key = item->getUUIDStr(), .content = ff->getResourceClaim()}); + } else { + keys_to_delete_.enqueue({.key = item->getUUIDStr()}); + } + return true; +} + REGISTER_RESOURCE_AS(FlowFileRepository, InternalResource, ("FlowFileRepository", "flowfilerepository")); } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h index e90081221..68959ce88 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.h +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -20,6 +20,7 @@ #include <vector> #include <string> #include <memory> +#include <list> #include "utils/file/FileUtils.h" #include "rocksdb/db.h" @@ -61,6 +62,11 @@ constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::mill class FlowFileRepository : public ThreadedRepository, public SwapManager { static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2}; + struct ExpiredFlowFileInfo { + std::string key; + std::shared_ptr<ResourceClaim> content{}; + }; + public: static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.flowfile.repository.encryption.key"; @@ -98,6 +104,7 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override; bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override; bool Delete(const std::string& key) override; + bool Delete(const std::shared_ptr<core::CoreComponent>& item) override; bool Get(const std::string &key, std::string &value) override; void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override; @@ -116,11 +123,13 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager { void initialize_repository(); + void deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& opendb, std::list<ExpiredFlowFileInfo>& flow_files); + std::thread& getThread() override { return thread_; } - moodycamel::ConcurrentQueue<std::string> keys_to_delete; + moodycamel::ConcurrentQueue<ExpiredFlowFileInfo> keys_to_delete_; std::shared_ptr<core::ContentRepository> content_repo_; std::unique_ptr<minifi::internal::RocksDatabase> db_; std::unique_ptr<FlowFileLoader> swap_loader_; diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 2bcf70e5d..77fdfbec3 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -98,6 +98,10 @@ class Repository : public core::CoreComponent { return true; } + virtual bool Delete(const std::shared_ptr<core::CoreComponent>& item) { + return Delete(item->getUUIDStr()); + } + virtual bool Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues); void setConnectionMap(std::map<std::string, core::Connectable*> connectionMap) {