This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4fb71dee2cf2401659df95d8a7f2a226c3ab2568
Author: Adam Debreceni <adebrec...@apache.org>
AuthorDate: Wed Feb 8 07:36:23 2023 +0100

    MINIFICPP-2040 - Store content claim for flow file deletion
    
    Signed-off-by: Gabor Gyimesi <gamezb...@gmail.com>
    
    This closes #1508
---
 extensions/rocksdb-repos/FlowFileRepository.cpp | 106 +++++++++++++++---------
 extensions/rocksdb-repos/FlowFileRepository.h   |  11 ++-
 libminifi/include/core/Repository.h             |   4 +
 3 files changed, 81 insertions(+), 40 deletions(-)

diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 7568201e3..36999c4e0 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -41,52 +41,71 @@ void FlowFileRepository::flush() {
     return;
   }
   auto batch = opendb->createWriteBatch();
-  rocksdb::ReadOptions options;
 
-  std::vector<std::shared_ptr<FlowFile>> purgeList;
+  std::list<ExpiredFlowFileInfo> flow_files;
 
-  std::vector<rocksdb::Slice> keys;
-  std::list<std::string> keystrings;
-  std::vector<std::string> values;
-
-  while (keys_to_delete.size_approx() > 0) {
-    std::string key;
-    if (keys_to_delete.try_dequeue(key)) {
-      keystrings.push_back(std::move(key));  // rocksdb::Slice doesn't copy 
the string, only grabs ptrs. Hacky, but have to ensure the required lifetime of 
the strings.
-      keys.push_back(keystrings.back());
+  while (keys_to_delete_.size_approx() > 0) {
+    ExpiredFlowFileInfo info;
+    if (keys_to_delete_.try_dequeue(info)) {
+      flow_files.push_back(std::move(info));
     }
   }
-  auto multistatus = opendb->MultiGet(options, keys, &values);
 
-  for (size_t i = 0; i < keys.size() && i < values.size() && i < 
multistatus.size(); ++i) {
-    if (!multistatus[i].ok()) {
-      logger_->log_error("Failed to read key from rocksdb: %s! DB is most 
probably in an inconsistent state!", keys[i].data());
-      keystrings.remove(keys[i].data());
-      continue;
-    }
+  deserializeFlowFilesWithNoContentClaim(opendb.value(), flow_files);
 
-    utils::Identifier containerId;
-    auto eventRead = 
FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const 
std::byte>(), content_repo_, containerId);
-    if (eventRead) {
-      purgeList.push_back(eventRead);
-    }
-    logger_->log_debug("Issuing batch delete, including %s, Content path %s", 
eventRead->getUUIDStr(), eventRead->getContentFullPath());
-    batch.Delete(keys[i]);
+  for (auto& ff : flow_files) {
+    batch.Delete(ff.key);
+    logger_->log_debug("Issuing batch delete, including %s, Content path %s", 
ff.key, ff.content ? ff.content->getContentFullPath() : "null");
   }
 
   auto operation = [&batch, &opendb]() { return 
opendb->Write(rocksdb::WriteOptions(), &batch); };
 
   if (!ExecuteWithRetry(operation)) {
-    for (const auto& key : keystrings) {
-      keys_to_delete.enqueue(key);  // Push back the values that we could get 
but couldn't delete
+    for (auto&& ff : flow_files) {
+      keys_to_delete_.enqueue(std::move(ff));
     }
     return;  // Stop here - don't delete from content repo while we have 
records in FF repo
   }
 
   if (content_repo_) {
-    for (const auto &ffr : purgeList) {
-      auto claim = ffr->getResourceClaim();
-      if (claim) claim->decreaseFlowFileRecordOwnedCount();
+    for (auto& ff : flow_files) {
+      if (ff.content) {
+        ff.content->decreaseFlowFileRecordOwnedCount();
+      }
+    }
+  }
+}
+
+void 
FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb&
 opendb, std::list<ExpiredFlowFileInfo>& flow_files) {
+  std::vector<rocksdb::Slice> keys;
+  std::vector<std::list<ExpiredFlowFileInfo>::iterator> key_positions;
+  for (auto it = flow_files.begin(); it != flow_files.end(); ++it) {
+    if (!it->content) {
+      keys.push_back(it->key);
+      key_positions.push_back(it);
+    }
+  }
+  if (keys.empty()) {
+    return;
+  }
+  std::vector<std::string> values;
+  auto multistatus = opendb.MultiGet(rocksdb::ReadOptions{}, keys, &values);
+  gsl_Expects(keys.size() == values.size() && values.size() == 
multistatus.size());
+
+  for (size_t i = 0; i < keys.size(); ++i) {
+    if (!multistatus[i].ok()) {
+      logger_->log_error("Failed to read key from rocksdb: %s! DB is most 
probably in an inconsistent state!", keys[i].data());
+      flow_files.erase(key_positions.at(i));
+      continue;
+    }
+
+    utils::Identifier container_id;
+    auto flow_file = 
FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const 
std::byte>(), content_repo_, container_id);
+    if (flow_file) {
+      gsl_Expects(flow_file->getUUIDStr() == key_positions.at(i)->key);
+      key_positions.at(i)->content = flow_file->getResourceClaim();
+    } else {
+      logger_->log_error("Could not deserialize flow file %s", 
key_positions.at(i)->key);
     }
   }
 }
@@ -149,34 +168,34 @@ void FlowFileRepository::initialize_repository() {
 
   auto it = opendb->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    utils::Identifier containerId;
-    auto eventRead = 
FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const 
std::byte>(), content_repo_, containerId);
+    utils::Identifier container_id;
+    auto eventRead = 
FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const 
std::byte>(), content_repo_, container_id);
     std::string key = it->key().ToString();
     if (eventRead) {
       // on behalf of the just resurrected persisted instance
       auto claim = eventRead->getResourceClaim();
       if (claim) claim->increaseFlowFileRecordOwnedCount();
       bool found = false;
-      auto search = containers_.find(containerId.to_string());
+      auto search = containers_.find(container_id.to_string());
       found = (search != containers_.end());
       if (!found) {
         // for backward compatibility
-        search = connection_map_.find(containerId.to_string());
+        search = connection_map_.find(container_id.to_string());
         found = (search != connection_map_.end());
       }
       if (found) {
-        logger_->log_debug("Found connection for %s, path %s ", 
containerId.to_string(), eventRead->getContentFullPath());
+        logger_->log_debug("Found connection for %s, path %s ", 
container_id.to_string(), eventRead->getContentFullPath());
         eventRead->setStoredToRepository(true);
         // we found the connection for the persistent flowFile
         // even if a processor immediately marks it for deletion, flush only 
happens after prune_stored_flowfiles
         search->second->restore(eventRead);
       } else {
-        logger_->log_warn("Could not find connection for %s, path %s ", 
containerId.to_string(), eventRead->getContentFullPath());
-        keys_to_delete.enqueue(key);
+        logger_->log_warn("Could not find connection for %s, path %s ", 
container_id.to_string(), eventRead->getContentFullPath());
+        keys_to_delete_.enqueue({.key = key, .content = 
eventRead->getResourceClaim()});
       }
     } else {
       // failed to deserialize FlowFile, cannot clear claim
-      keys_to_delete.enqueue(key);
+      keys_to_delete_.enqueue({.key = key});
     }
   }
   flush();
@@ -287,7 +306,7 @@ bool FlowFileRepository::MultiPut(const 
std::vector<std::pair<std::string, std::
 }
 
 bool FlowFileRepository::Delete(const std::string& key) {
-  keys_to_delete.enqueue(key);
+  keys_to_delete_.enqueue({.key = key});
   return true;
 }
 
@@ -340,6 +359,15 @@ std::future<std::vector<std::shared_ptr<core::FlowFile>>> 
FlowFileRepository::lo
   return swap_loader_->load(std::move(flow_files));
 }
 
+bool FlowFileRepository::Delete(const std::shared_ptr<core::CoreComponent>& 
item) {
+  if (auto ff = std::dynamic_pointer_cast<core::FlowFile>(item)) {
+    keys_to_delete_.enqueue({.key = item->getUUIDStr(), .content = 
ff->getResourceClaim()});
+  } else {
+    keys_to_delete_.enqueue({.key = item->getUUIDStr()});
+  }
+  return true;
+}
+
 REGISTER_RESOURCE_AS(FlowFileRepository, InternalResource, 
("FlowFileRepository", "flowfilerepository"));
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h 
b/extensions/rocksdb-repos/FlowFileRepository.h
index e90081221..68959ce88 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -20,6 +20,7 @@
 #include <vector>
 #include <string>
 #include <memory>
+#include <list>
 
 #include "utils/file/FileUtils.h"
 #include "rocksdb/db.h"
@@ -61,6 +62,11 @@ constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS 
= std::chrono::mill
 class FlowFileRepository : public ThreadedRepository, public SwapManager {
   static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = 
std::chrono::minutes{2};
 
+  struct ExpiredFlowFileInfo {
+    std::string key;
+    std::shared_ptr<ResourceClaim> content{};
+  };
+
  public:
   static constexpr const char* ENCRYPTION_KEY_NAME = 
"nifi.flowfile.repository.encryption.key";
 
@@ -98,6 +104,7 @@ class FlowFileRepository : public ThreadedRepository, public 
SwapManager {
   bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override;
   bool MultiPut(const std::vector<std::pair<std::string, 
std::unique_ptr<minifi::io::BufferStream>>>& data) override;
   bool Delete(const std::string& key) override;
+  bool Delete(const std::shared_ptr<core::CoreComponent>& item) override;
   bool Get(const std::string &key, std::string &value) override;
 
   void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo) override;
@@ -116,11 +123,13 @@ class FlowFileRepository : public ThreadedRepository, 
public SwapManager {
 
   void initialize_repository();
 
+  void deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& 
opendb, std::list<ExpiredFlowFileInfo>& flow_files);
+
   std::thread& getThread() override {
     return thread_;
   }
 
-  moodycamel::ConcurrentQueue<std::string> keys_to_delete;
+  moodycamel::ConcurrentQueue<ExpiredFlowFileInfo> keys_to_delete_;
   std::shared_ptr<core::ContentRepository> content_repo_;
   std::unique_ptr<minifi::internal::RocksDatabase> db_;
   std::unique_ptr<FlowFileLoader> swap_loader_;
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index 2bcf70e5d..77fdfbec3 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -98,6 +98,10 @@ class Repository : public core::CoreComponent {
     return true;
   }
 
+  virtual bool Delete(const std::shared_ptr<core::CoreComponent>& item) {
+    return Delete(item->getUUIDStr());
+  }
+
   virtual bool 
Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues);
 
   void setConnectionMap(std::map<std::string, core::Connectable*> 
connectionMap) {

Reply via email to