Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/110#discussion_r123120308 --- Diff: libminifi/include/core/repository/AtomicRepoEntries.h --- @@ -0,0 +1,501 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ref_count_hip. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ + +#include <cstddef> +#include <cstring> +#include <iostream> +#include <chrono> +#include <functional> +#include <atomic> +#include <vector> +#include <map> +#include <iterator> + +static uint16_t accounting_size = sizeof(std::vector<uint8_t>) + sizeof(std::string) + sizeof(size_t); + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +/** + * Purpose: Repo value represents an item that will support a move operation within an AtomicEntry + * + * Justification: Since AtomicEntry is a static entry that does not move or change, the underlying + * RepoValue can be changed to support atomic operations. + */ +template<typename T> +class RepoValue { + public: + + explicit RepoValue() { + } + + /** + * Constructor that populates the item allowing for a custom key comparator. + * @param key key for this repo value. + * @param ptr buffer + * @param size size buffer + * @param comparator custom comparator. + */ + explicit RepoValue(T key, const uint8_t *ptr, size_t size, std::function<bool(T, T)> comparator = nullptr) + : key_(key), + comparator_(comparator) { + if (nullptr == ptr) { + size = 0; + } + buffer_.resize(size); + if (size > 0) { + std::memcpy(buffer_.data(), ptr, size); + } + } + + /** + * RepoValue that moves the other object into this. + */ + explicit RepoValue(RepoValue<T> &&other) +noexcept : key_(std::move(other.key_)), + buffer_(std::move(other.buffer_)), + comparator_(std::move(other.comparator_)) { + } + + ~RepoValue() + { + } + + T &getKey() { + return key_; + } + + /** + * Sets the key, relacing the custom comparator if needed. + */ + void setKey(const T key, std::function<bool(T,T)> comparator = nullptr) { + key_ = key; + comparator_ = comparator; + } + + /** + * Determines if the key is the same using the custom comparator + * @param other object to compare against + * @return result of the comparison + */ + inline bool isEqual(RepoValue<T> *other) + { + return comparator_ == nullptr ? key_ == other->key_ : comparator_(key_,other->key_); + } + + /** + * Determines if the key is the same using the custom comparator + * @param other object to compare against + * @return result of the comparison + */ + inline bool isKey(T other) + { + return comparator_ == nullptr ? key_ == other : comparator_(key_,other); + } + + /** + * Clears the buffer. + */ + void clearBuffer() { + buffer_.resize(0); + buffer_.clear(); + } + + /** + * Return the size of the memory within the key + * buffer, the size of timestamp, and the general + * system word size + */ + uint64_t size() { + return buffer_.size(); + } + + size_t getBufferSize() { + return buffer_.size(); + } + + const uint8_t *getBuffer() + { + return buffer_.data(); + } + + /** + * Places the contents of buffer into str + * @param strnig into which we are placing the memory contained in buffer. + */ + void emplace(std::string &str) { + str.insert(0, reinterpret_cast<const char*>(buffer_.data()), buffer_.size()); + } + + /** + * Appends ptr to the end of buffer. + * @param ptr pointer containing data to add to buffer_ + */ + void append(uint8_t *ptr, size_t size) + { + buffer_.insert(buffer_.end(), ptr, ptr + size); + } + + RepoValue<T> &operator=(RepoValue<T> &&other) noexcept { + key_ = std::move(other.key_); + buffer_ = std::move(other.buffer_); + return *this; + } + + private: + T key_; + std::function<bool(T,T)> comparator_; + std::vector<uint8_t> buffer_; + }; + + /** + * Purpose: Atomic Entry allows us to create a statically + * sized ring buffer, with the ability to create + * + **/ +template<typename T> +class AtomicEntry { + + public: + /** + * Constructor that accepts a max size and an atomic counter for the total + * size allowd by this and other atomic entries. + */ + explicit AtomicEntry(std::atomic<size_t> *total_size, size_t *max_size) + : write_pending_(false), + has_value_(false), + total_size_(total_size), + max_size_(max_size), + ref_count_(0), + free_required(false) { + + } + + /** + * Sets the repo value, moving the old value into old_value. + * @param new_value new value to move into value_. + * @param old_value the previous value of value_ will be moved into old_value + * @param prev_size size reclaimed. + * @return result of this set. If true old_value will be populated. + */ + bool setRepoValue(RepoValue<T> &new_value, RepoValue<T> &old_value, size_t &prev_size) { + // delete the underlying pointer + bool lock = false; + if (!write_pending_.compare_exchange_weak(lock, true)) + { + return false; + } + if (has_value_) { + prev_size = value_.size(); + } + old_value = std::move(value_); + value_ = std::move(new_value); + has_value_ = true; + try_unlock(); + return true; + } + + + AtomicEntry<T> *takeOwnership() + { + bool lock = false; + if (!write_pending_.compare_exchange_weak(lock, true) ) + return nullptr; + + ref_count_++; + + try_unlock(); + + return this; + } + /** + * A test and set operation, which is used to allow a function to test + * if an item can be released and a function used for reclaiming memory associated + * with said object. + * A custom comparator can be provided to augment the key being added into value_ + */ + bool testAndSetKey(const T str, std::function<bool(T)> releaseTest = nullptr, std::function<void(T)> reclaimer = nullptr, std::function<bool(T, T)> comparator = nullptr) { + bool lock = false; + + if (!write_pending_.compare_exchange_weak(lock, true) ) + return false; + + if (has_value_) { + // we either don't have a release test or we cannot release this + // entity + if (releaseTest != nullptr && reclaimer != nullptr && releaseTest(value_.getKey())) + { + reclaimer(value_.getKey()); + } + else if (free_required && ref_count_ == 0) + { + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + has_value_ = false; + if (total_size_ != nullptr) { + *total_size_ -= bufferSize; + } + free_required = false; + } + else { + try_unlock(); + return false; + } + + } + ref_count_=1; + value_.setKey(str, comparator); + has_value_ = true; + try_unlock(); + return true; + } + + /** + * Moved the value into the argument + * @param value the previous value will be moved into this parameter + * @return success of get operation based on whether or not this atomic entry has a value. + */ + bool getValue(RepoValue<T> &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + /** + * Moved the value into the argument + * @param value the previous value will be moved into this parameter + * @return success of get operation based on whether or not this atomic entry has a value. + */ + bool getValue(const T &key, RepoValue<T> &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (!value_.isKey(key)) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + void decrementOwnership(){ + try_lock(); + if (!has_value_) { + try_unlock(); + return; + } + if (ref_count_ > 0){ + ref_count_--; + } + if (ref_count_ == 0 && free_required) + { + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + has_value_ = false; + if (total_size_ != nullptr) { + *total_size_ -= bufferSize; + } + free_required = false; + } + else{ + } + try_unlock(); + } + + /** + * Moved the value into the argument + * @param value the previous value will be moved into this parameter + * @return success of get operation based on whether or not this atomic entry has a value. + */ + bool getValue(const T &key, RepoValue<T> **value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (!value_.isKey(key)) { + try_unlock(); + return false; + } + ref_count_++; + *value = &value_; + try_unlock(); + return true; + } + + /** + * Operation that will be used to test and free if a release is required without + * setting a new object. + * @param releaseTest function that will be used to free the RepoValue key from + * this atomic entry. + * @param freedValue informs the caller if an item was freed. + */ + T testAndFree(std::function<bool(T)> releaseTest, bool &freedValue) { + try_lock(); + T ref; + if (!has_value_) { + try_unlock(); + return ref; + } + + if (releaseTest(value_.getKey())) { + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + ref = value_.getKey(); + has_value_ = false; + if (total_size_ != nullptr) { + *total_size_ -= bufferSize; + } + + } + try_unlock(); + return ref; + } + + size_t getLength() + { + size_t size = 0; + try_lock(); + size = value_.getBufferSize(); + try_unlock(); + return size; + + } + + /** + * sets has_value to false; however, does not call + * any external entity to further free RepoValue + */ + bool freeValue(const T &key) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (!value_.isKey(key)) { + try_unlock(); + return false; + } + if (ref_count_ > 0) + { + free_required = true; + try_unlock(); + return true; + } + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + has_value_ = false; + if (total_size_ != nullptr) { + *total_size_ -= bufferSize; + } + free_required = false; + try_unlock(); + return true; + } + + /** + * Appends buffer onto this atomic entry if key matches + * the current RepoValue's key. + */ + bool insert(const T key, uint8_t *buffer, size_t size) { + try_lock(); + + if (!has_value_) { + try_unlock(); + return false; + } + + if (!value_.isKey(key)) { + try_unlock(); + return false; + } + + if ((total_size_ != nullptr && max_size_ != nullptr) && (*total_size_ + size > *max_size_)) { --- End diff -- These are pointers to a global max size for that volatile repo? Is it confusing? I can rename it to make it clearer.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---