lordgamez commented on code in PR #1945:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1945#discussion_r2032784506


##########
cmake/FetchBenchmark.cmake:
##########
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+include(FetchContent)
+
+FetchContent_Declare(
+        benchmark

Review Comment:
   We should update the LICENSE and NOTICE files with the new dependency



##########
libminifi/include/core/repository/LegacyVolatileContentRepository.h:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "AtomicRepoEntries.h"
+#include "io/AtomicEntryStream.h"
+#include "core/ContentRepository.h"
+#include "properties/Configure.h"
+#include "core/Connectable.h"
+#include "core/logging/LoggerFactory.h"
+#include "utils/GeneralUtils.h"
+#include "VolatileRepositoryData.h"
+#include "utils/Literals.h"
+
+namespace org::apache::nifi::minifi::core::repository {
+/**
+ * Purpose: Stages content into a volatile area of memory. Note that when the 
maximum number
+ * of entries is consumed we will rollback a session to wait for others to be 
freed.
+ */
+class LegacyVolatileContentRepository : public core::ContentRepositoryImpl {
+ public:
+  static const char *minimal_locking;
+
+  explicit LegacyVolatileContentRepository(std::string_view name = 
className<LegacyVolatileContentRepository>())
+    : core::ContentRepositoryImpl(name),
+      repo_data_(15000, static_cast<size_t>(10_MiB * 0.75)),
+      minimize_locking_(true),
+      
logger_(logging::LoggerFactory<LegacyVolatileContentRepository>::getLogger()) {
+  }
+
+  ~LegacyVolatileContentRepository() override {
+    logger_->log_debug("Clearing repository");
+    if (!minimize_locking_) {
+      std::lock_guard<std::mutex> lock(map_mutex_);
+      for (const auto &item : master_list_) {
+        delete item.second;
+      }
+      master_list_.clear();
+    }
+  }
+
+  uint64_t getRepositorySize() const override {
+    return repo_data_.getRepositorySize();
+  }
+
+  uint64_t getMaxRepositorySize() const override {
+    return repo_data_.getMaxRepositorySize();
+  }
+
+  uint64_t getRepositoryEntryCount() const override {
+    return master_list_.size();
+  }
+
+  bool isFull() const override {
+    return repo_data_.isFull();
+  }
+
+  /**
+   * Initialize the volatile content repo
+   * @param configure configuration
+   */

Review Comment:
   This and other comments without value in this file can be removed



##########
libminifi/src/core/repository/VolatileContentRepository.cpp:
##########
@@ -17,142 +16,133 @@
  */
 
 #include "core/repository/VolatileContentRepository.h"
+#include "core/logging/LoggerFactory.h"
 
-#include <cstdio>
-#include <memory>
-#include <string>
-#include <thread>
+namespace org::apache::nifi::minifi::core::repository {
 
-#include "core/expect.h"
-#include "io/FileStream.h"
-#include "utils/StringUtils.h"
+namespace {
 
-using namespace std::literals::chrono_literals;
+class StringRefStream : public io::BaseStream {
+ public:
+  StringRefStream(std::shared_ptr<std::string> data, std::mutex& 
data_store_mtx, std::shared_ptr<std::string>& data_store, std::atomic<size_t>& 
total_size)
+      : data_(std::move(data)), data_store_mtx_(data_store_mtx), 
data_store_(data_store), total_size_(total_size) {}
 
-namespace org::apache::nifi::minifi::core::repository {
+  [[nodiscard]] size_t size() const override {
+    return data_->size();
+  }
+
+  size_t read(std::span<std::byte> out_buffer) override {
+    auto read_size = std::min(data_->size() - read_offset_, out_buffer.size());
+    std::copy_n(reinterpret_cast<const std::byte*>(data_->data()) + 
read_offset_, read_size, out_buffer.data());
+    read_offset_ += read_size;
+    return read_size;
+  }
 
-const char *VolatileContentRepository::minimal_locking = "minimal.locking";
+  size_t write(const uint8_t *value, size_t len) override {
+    data_ = std::make_shared<std::string>(*data_);
+    data_->append(reinterpret_cast<const char*>(value), len);
+    total_size_ += len;
+    {
+      std::lock_guard lock(data_store_mtx_);
+      data_store_ = data_;
+    }
+    return len;
+  }
 
-bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> 
&configure) {
-  repo_data_.initialize(configure, getName());
+  void close() override {}
 
-  logger_->log_info("Resizing repo_data_.value_vector for {} count is {}", 
getName(), repo_data_.max_count);
-  logger_->log_info("Using a maximum size for {} of {}", getName(), 
repo_data_.max_size);
+  void seek(size_t offset) override {
+    read_offset_ = std::min(offset, data_->size());
+  }
 
-  if (configure != nullptr) {
-    std::string value;
-    std::stringstream strstream;
-    strstream << Configure::nifi_volatile_repository_options << getName() << 
"." << minimal_locking;
-    if (configure->get(strstream.str(), value)) {
-      minimize_locking_ =  utils::string::toBool(value).value_or(true);
-    }
+  size_t tell() const override {
+    return read_offset_;
   }
-  if (!minimize_locking_) {
-    repo_data_.clear();
+
+  int initialize() override {
+    return 1;
+  }
+
+  std::span<const std::byte> getBuffer() const override {
+    return as_bytes(std::span{*data_});
   }
 
+ private:
+  size_t read_offset_{0};
+  std::shared_ptr<std::string> data_;
+  std::mutex& data_store_mtx_;
+  std::shared_ptr<std::string>& data_store_;
+  std::atomic<size_t>& total_size_;
+};
+
+}  // namespace
+
+VolatileContentRepository::VolatileContentRepository(std::string_view name)
+  : ContentRepositoryImpl(name),
+    logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {}
+
+uint64_t VolatileContentRepository::getRepositorySize() const {
+  return total_size_.load();
+}
+
+uint64_t VolatileContentRepository::getMaxRepositorySize() const {
+  return std::numeric_limits<uint64_t>::max();
+}
+
+uint64_t VolatileContentRepository::getRepositoryEntryCount() const {
+  std::lock_guard lock(data_mtx_);
+  return data_.size();
+}
+
+bool VolatileContentRepository::isFull() const {
+  return false;
+}
+
+bool VolatileContentRepository::initialize(const std::shared_ptr<Configure>& 
/*configure*/) {
   return true;
 }
 
-std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const 
minifi::ResourceClaim &claim, bool /*append*/) {
-  logger_->log_info("enter write for {}", claim.getContentFullPath());
-  {
-    std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_check = master_list_.find(claim.getContentFullPath());
-    if (claim_check != master_list_.end()) {
-      logger_->log_info("Creating copy of atomic entry");
-      auto ent = claim_check->second->takeOwnership();
-      if (ent == nullptr) {
-        return nullptr;
-      }
-      return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 ent);
-    }
+std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const 
minifi::ResourceClaim &claim, bool append) {
+  std::lock_guard lock(data_mtx_);
+  auto& value_ref = data_[claim.getContentFullPath()];
+  if (!value_ref) {
+    value_ref = std::make_shared<std::string>();
+  } else if (!append) {
+    total_size_ -= value_ref->size();
+    value_ref = std::make_shared<std::string>();
   }
+  return std::make_shared<StringRefStream>(value_ref, data_mtx_, value_ref, 
total_size_);
+}
 
-  int size = 0;
-  if (LIKELY(minimize_locking_ == true)) {
-    for (auto ent : repo_data_.value_vector) {
-      if (ent->testAndSetKey(claim.getContentFullPath())) {
-        std::lock_guard<std::mutex> lock(map_mutex_);
-        master_list_[claim.getContentFullPath()] = ent;
-        logger_->log_info("Minimize locking, return stream for {}", 
claim.getContentFullPath());
-        return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 ent);
-      }
-      size++;
-    }
-  } else {
-    std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_check = master_list_.find(claim.getContentFullPath());
-    if (claim_check != master_list_.end()) {
-      return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 claim_check->second);
-    } else {
-      auto *ent = new 
AtomicEntry<ResourceClaim::Path>(&repo_data_.current_size, 
&repo_data_.max_size);  // NOLINT(cppcoreguidelines-owning-memory)
-      if (ent->testAndSetKey(claim.getContentFullPath())) {
-        master_list_[claim.getContentFullPath()] = ent;
-        return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 ent);
-      }
-    }
+std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const 
minifi::ResourceClaim &claim) {
+  std::lock_guard lock(data_mtx_);
+  if (auto it = data_.find(claim.getContentFullPath()); it != data_.end()) {
+    return std::make_shared<StringRefStream>(it->second, data_mtx_, 
it->second, total_size_);
   }
-  logger_->log_info("Cannot write {} {}, returning nullptr to roll back 
session. Repo is either full or locked", claim.getContentFullPath(), size);
   return nullptr;
 }
 
 bool VolatileContentRepository::exists(const minifi::ResourceClaim &claim) {
-  std::lock_guard<std::mutex> lock(map_mutex_);
-  auto claim_check = master_list_.find(claim.getContentFullPath());
-  if (claim_check != master_list_.end()) {
-    auto ent = claim_check->second->takeOwnership();
-    return ent != nullptr;
-  }
-
-  return false;
+  std::lock_guard lock(data_mtx_);
+  return data_.contains(claim.getContentFullPath());
 }
 
-std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const 
minifi::ResourceClaim &claim) {
-  std::lock_guard<std::mutex> lock(map_mutex_);
-  auto claim_check = master_list_.find(claim.getContentFullPath());
-  if (claim_check != master_list_.end()) {
-    auto ent = claim_check->second->takeOwnership();
-    if (ent == nullptr) {
-      return nullptr;
-    }
-    return 
std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(),
 ent);
-  }
+bool VolatileContentRepository::close(const minifi::ResourceClaim &claim) {
+  return remove(claim);
+}
 
-  return nullptr;
+void VolatileContentRepository::clearOrphans() {
+  // there are no persisted orphans to delete
 }
 
 bool VolatileContentRepository::removeKey(const std::string& content_path) {
-  if (LIKELY(minimize_locking_ == true)) {
-    std::lock_guard<std::mutex> lock(map_mutex_);
-    auto ent = master_list_.find(content_path);
-    if (ent != master_list_.end()) {
-      auto ptr = ent->second;
-      // if we cannot remove the entry we will let the owner's destructor
-      // decrement the reference count and free it
-      master_list_.erase(content_path);
-      // because of the test and set we need to decrement ownership
-      ptr->decrementOwnership();
-      if (ptr->freeValue(content_path)) {
-        logger_->log_info("Deleting resource {}", content_path);
-      } else {
-        logger_->log_info("free failed for {}", content_path);
-      }
-    } else {
-      logger_->log_info("Could not remove {}", content_path);
-    }
-  } else {
-    std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_item = master_list_.find(content_path);
-    if (claim_item != master_list_.end()) {
-      auto size = claim_item->second->getLength();
-      delete claim_item->second;  // NOLINT(cppcoreguidelines-owning-memory)
-      master_list_.erase(content_path);
-      repo_data_.current_size -= size;
-    }
+  std::lock_guard lock(data_mtx_);
+  if (auto it = data_.find(content_path); it != data_.end()) {
+    total_size_ -= it->second->size();
+    data_.erase(it);
+    logger_->log_info("Deleting resource {}", content_path);
   }

Review Comment:
   We could add some log if the key is not found



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to