Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 9f98a407c -> d26d65af4
MINIFI-37: Create a volatile repository and config items for NoOp and Volatile repository configuration Allow the max count of the volatile repository size to be configurable This closes #98. Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/d26d65af Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/d26d65af Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/d26d65af Branch: refs/heads/master Commit: d26d65af4448ddb054975601a9bf59f8660e6a8d Parents: 9f98a40 Author: Marc Parisi <phroc...@apache.org> Authored: Fri May 12 15:22:06 2017 -0400 Committer: Aldrin Piri <ald...@apache.org> Committed: Wed May 24 09:23:39 2017 -0400 ---------------------------------------------------------------------- README.md | 12 + libminifi/include/core/Core.h | 15 +- libminifi/include/core/Repository.h | 15 +- libminifi/include/core/RepositoryFactory.h | 3 +- libminifi/include/core/logging/BaseLogger.h | 3 +- .../core/repository/FlowFileRepository.h | 12 +- .../core/repository/VolatileRepository.h | 372 +++++++++++++++++++ libminifi/include/properties/Configure.h | 1 + .../include/provenance/ProvenanceRepository.h | 4 +- libminifi/src/Configure.cpp | 2 + libminifi/src/FlowController.cpp | 6 +- libminifi/src/core/RepositoryFactory.cpp | 14 +- .../src/core/repository/VolatileRepository.cpp | 66 ++++ libminifi/test/unit/ProvenanceTestHelper.h | 2 +- libminifi/test/unit/ProvenanceTests.cpp | 86 ++++- libminifi/test/unit/RepoTests.cpp | 9 +- main/MiNiFiMain.cpp | 8 +- 17 files changed, 597 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index df89fb8..1457350 100644 --- a/README.md +++ b/README.md @@ -299,7 +299,19 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc if you do not want to enable client certificate base authorization nifi.security.need.ClientAuth=false + +### Configuring Volatile and NO-OP Repositories + in minifi.properties + + # For Volatile Repositories: + nifi.flow.repository.class.name=VolatileRepository + nifi.provenance.repository.class.name=VolatileRepository + + # For NO-OP Repositories: + nifi.flow.repository.class.name=NoOpRepository + nifi.provenance.repository.class.name=NoOpRepository + ### Provenance Report Add Provenance Reporting to config.yml http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index 453a6a5..335f306 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -40,8 +40,10 @@ namespace core { template<typename T> static inline std::string getClassName() { char *b = abi::__cxa_demangle(typeid(T).name(), 0, 0, 0); + if (b == nullptr) + return std::string(); std::string name = b; - delete[] b; + std::free(b); return name; } @@ -64,13 +66,18 @@ struct class_operations { }; template<typename T> -typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type instantiate() { +typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type instantiate(const std::string name = "") { throw std::runtime_error("Cannot instantiate class"); } template<typename T> -typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type instantiate() { - return std::make_shared<T>(); +typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type instantiate(const std::string name = "") { + if (name.length() == 0){ + return std::make_shared<T>(); + } + else{ + return std::make_shared<T>(name); + } } /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 48ccc47..94517d8 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -47,14 +47,21 @@ namespace nifi { namespace minifi { namespace core { +#define REPOSITORY_DIRECTORY "./repo" +#define MAX_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M +#define MAX_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute +#define REPOSITORY_PURGE_PERIOD (2500) // 2500 msec + class Repository : public CoreComponent { public: /* * Constructor for the repository */ - Repository(std::string repo_name, std::string directory, - int64_t maxPartitionMillis, int64_t maxPartitionBytes, - uint64_t purgePeriod) + Repository(std::string repo_name="Repository", + std::string directory = REPOSITORY_DIRECTORY, + int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) : CoreComponent(repo_name), thread_() { directory_ = directory; @@ -84,7 +91,7 @@ class Repository : public CoreComponent { } virtual bool Get(std::string key, std::string &value) { - return true; + return false; } // Run function for the thread http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/RepositoryFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/RepositoryFactory.h b/libminifi/include/core/RepositoryFactory.h index ed9a026..db474a0 100644 --- a/libminifi/include/core/RepositoryFactory.h +++ b/libminifi/include/core/RepositoryFactory.h @@ -20,6 +20,7 @@ #define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ #include "core/Repository.h" +#include "core/repository/VolatileRepository.h" #include "Core.h" namespace org { @@ -30,7 +31,7 @@ namespace minifi { namespace core { std::shared_ptr<core::Repository> createRepository( - const std::string configuration_class_name, bool fail_safe = false); + const std::string configuration_class_name, bool fail_safe = false,const std::string repo_name = ""); } /* namespace core */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/logging/BaseLogger.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/BaseLogger.h b/libminifi/include/core/logging/BaseLogger.h index 9d00fb6..904bac2 100644 --- a/libminifi/include/core/logging/BaseLogger.h +++ b/libminifi/include/core/logging/BaseLogger.h @@ -54,7 +54,8 @@ typedef enum { template<typename ... Args> inline std::string format_string(char const* format_str, Args&&... args) { - char buf[LOG_BUFFER_SIZE]; + char buf[LOG_BUFFER_SIZE+1] = {0}; + std::snprintf(buf, LOG_BUFFER_SIZE, format_str, args...); return std::string(buf); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h index 9fc13e0..051dfb0 100644 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -47,21 +47,15 @@ class FlowFileRepository : public core::Repository, public: // Constructor - FlowFileRepository(std::string directory, int64_t maxPartitionMillis, - int64_t maxPartitionBytes, uint64_t purgePeriod) - : Repository(core::getClassName<FlowFileRepository>(), directory, + FlowFileRepository(const std::string repo_name = "", std::string directory=FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis=MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes=MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod=FLOWFILE_REPOSITORY_PURGE_PERIOD) + : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) { db_ = NULL; } - FlowFileRepository() - : FlowFileRepository(FLOWFILE_REPOSITORY_DIRECTORY, - MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, - MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, - FLOWFILE_REPOSITORY_PURGE_PERIOD) { - } // Destructor ~FlowFileRepository() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/repository/VolatileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h new file mode 100644 index 0000000..1e07e28 --- /dev/null +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -0,0 +1,372 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ + +#include "core/Repository.h" +#include <chrono> +#include <vector> +#include <map> +#include "core/Core.h" +#include "Connection.h" +#include "utils/StringUtils.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +static uint16_t accounting_size = sizeof(std::vector<uint8_t>) + + sizeof(std::string) + sizeof(size_t); + +class RepoValue { + public: + + explicit RepoValue() { + } + + explicit RepoValue(std::string key, uint8_t *ptr, size_t size) + : key_(key) { + buffer_.resize(size); + std::memcpy(buffer_.data(), ptr, size); + fast_size_ = key.size() + size; + } + + explicit RepoValue(RepoValue &&other) +noexcept : key_(std::move(other.key_)), + buffer_(std::move(other.buffer_)), + fast_size_(other.fast_size_) { + } + + ~RepoValue() + { + } + + std::string &getKey() { + return key_; + } + + /** + * Return the size of the memory within the key + * buffer, the size of timestamp, and the general + * system word size + */ + uint64_t size() { + return fast_size_; + } + + size_t bufferSize() { + return buffer_.size(); + } + + void emplace(std::string &str) { + str.insert(0, reinterpret_cast<const char*>(buffer_.data()), buffer_.size()); + } + + RepoValue &operator=(RepoValue &&other) noexcept { + key_ = std::move(other.key_); + buffer_ = std::move(other.buffer_); + other.buffer_.clear(); + return *this; + } + + private: + size_t fast_size_; + std::string key_; + std::vector<uint8_t> buffer_; + }; + + /** + * Purpose: Atomic Entry allows us to create a statically + * sized ring buffer, with the ability to create + **/ +class AtomicEntry { + + public: + AtomicEntry() + : write_pending_(false), + has_value_(false) { + + } + + bool setRepoValue(RepoValue &new_value, size_t &prev_size) { + // delete the underlying pointer + bool lock = false; + if (!write_pending_.compare_exchange_weak(lock, true) && !lock) + return false; + if (has_value_) { + prev_size = value_.size(); + } + value_ = std::move(new_value); + has_value_ = true; + try_unlock(); + return true; + } + + bool getValue(RepoValue &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + bool getValue(const std::string &key, RepoValue &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (value_.getKey() != key) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + private: + + inline void try_lock() { + bool lock = false; + while (!write_pending_.compare_exchange_weak(lock, true) && !lock) { + // attempt again + } + } + + inline void try_unlock() { + bool lock = true; + while (!write_pending_.compare_exchange_weak(lock, false) && lock) { + // attempt again + } + } + + std::atomic<bool> write_pending_; + std::atomic<bool> has_value_; + RepoValue value_; +}; + +/** + * Flow File repository + * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate. + */ +class VolatileRepository : public core::Repository, + public std::enable_shared_from_this<VolatileRepository> { + public: + + static const char *volatile_repo_max_count; + // Constructor + + VolatileRepository( + std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, + int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = + MAX_REPOSITORY_STORAGE_SIZE, + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) + : Repository( + repo_name.length() > 0 ? + repo_name : core::getClassName<VolatileRepository>(), + "", maxPartitionMillis, maxPartitionBytes, purgePeriod), + max_size_(maxPartitionBytes * 0.75), + current_index_(0), + max_count_(10000) + + { + + } + + // Destructor + ~VolatileRepository() { + for (auto ent : value_vector_) { + delete ent; + } + } + + /** + * Initialize thevolatile repsitory + **/ + virtual bool initialize(const std::shared_ptr<Configure> &configure) { + std::string value = ""; + + if (configure != nullptr) { + int64_t max_cnt = 0; + std::stringstream strstream; + strstream << Configure::nifi_volatile_repository_options << getName() + << "." << volatile_repo_max_count; + if (configure->get(strstream.str(), value)) { + if (core::Property::StringToInt(value, max_cnt)) { + max_count_ = max_cnt; + } + + } + } + + logger_->log_debug("Resizing value_vector_ for %s count is %d", getName(), + max_count_); + value_vector_.reserve(max_count_); + for (int i = 0; i < max_count_; i++) { + value_vector_.emplace_back(new AtomicEntry()); + } + return true; + } + + virtual void run(); + + /** + * Places a new object into the volatile memory area + * @param key key to add to the repository + * @param buf buffer + **/ + virtual bool Put(std::string key, uint8_t *buf, int bufLen) { + RepoValue new_value(key, buf, bufLen); + + const size_t size = new_value.size(); + bool updated = false; + size_t reclaimed_size = 0; + do { + + int private_index = current_index_.fetch_add(1); + // round robin through the beginning + if (private_index >= max_count_) { + uint16_t new_index = 0; + if (current_index_.compare_exchange_weak(new_index, 0)) { + private_index = 0; + } else { + continue; + } + } + logger_->log_info("Set repo value at %d out of %d", private_index, + max_count_); + updated = value_vector_.at(private_index)->setRepoValue(new_value, + reclaimed_size); + + if (reclaimed_size > 0) { + current_size_ -= reclaimed_size; + } + + } while (!updated); + current_size_ += size; + + logger_->log_info("VolatileRepository -- put %s %d %d", key, + current_size_.load(), current_index_.load()); + return true; + } + /** + *c + * Deletes the key + * @return status of the delete operation + */ + virtual bool Delete(std::string key) { + + logger_->log_info("VolatileRepository -- delete %s", key); + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue value; + if (ent->getValue(key, value)) { + current_size_ -= value.size(); + return true; + } + + } + return false; + } + /** + * Sets the value from the provided key. Once the item is retrieved + * it may not be retrieved again. + * @return status of the get operation. + */ + virtual bool Get(std::string key, std::string &value) { + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue repo_value; + + if (ent->getValue(key, repo_value)) { + current_size_ -= value.size(); + repo_value.emplace(value); + logger_->log_info("VolatileRepository -- get %s %d", key, + current_size_.load()); + return true; + } + + } + return false; + } + + void setConnectionMap( + std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) { + this->connectionMap = connectionMap; + } + void loadComponent(); + + void start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + thread_ = std::thread(&VolatileRepository::run, shared_from_this()); + thread_.detach(); + running_ = true; + logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); + } + + protected: + + /** + * Tests whether or not the current size exceeds the capacity + * if the new prospectiveSize is inserted. + * @param prospectiveSize size of item to be added. + */ + inline bool exceedsCapacity(uint32_t prospectiveSize) { + if (current_size_ + prospectiveSize > max_size_) + return true; + else + return false; + } + /** + * Purges the volatile repository. + */ + void purge(); + + private: + std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; + + std::atomic<uint32_t> current_size_; + std::atomic<uint16_t> current_index_; + std::vector<AtomicEntry*> value_vector_; + uint32_t max_count_; + uint32_t max_size_; + +} +; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 4119edd..2d5f293 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -49,6 +49,7 @@ class Configure { static const char *nifi_server_name; static const char *nifi_configuration_class_name; static const char *nifi_flow_repository_class_name; + static const char *nifi_volatile_repository_options; static const char *nifi_provenance_repository_class_name; static const char *nifi_server_port; static const char *nifi_server_report_interval; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index af613a5..9e055f9 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -42,12 +42,12 @@ class ProvenanceRepository : public core::Repository, /*! * Create a new provenance repository */ - ProvenanceRepository(std::string directory = PROVENANCE_DIRECTORY, + ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) - : Repository(core::getClassName<ProvenanceRepository>(), directory, + : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) { db_ = NULL; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index f35e88a..c1524a2 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -41,6 +41,8 @@ const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name"; const char *Configure::nifi_flow_repository_class_name = "nifi.flow.repository.class.name"; +const char *Configure::nifi_volatile_repository_options = + "nifi.volatile.repository.options."; const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name"; const char *Configure::nifi_server_port = "nifi.server.port"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 5f6e014..c87875d 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -297,9 +297,11 @@ void FlowController::loadFlowRepo() { } logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size()); - auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>( + auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>( flow_file_repo_); - rep->setConnectionMap(connectionMap); + if (nullptr != rep) { + rep->setConnectionMap(connectionMap); + } flow_file_repo_->loadComponent(); } else { logger_->log_debug("Flow file repository is not set"); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/src/core/RepositoryFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index c24a2af..45ad980 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -24,6 +24,8 @@ #include "provenance/ProvenanceRepository.h" #endif +#include "core/repository/VolatileRepository.h" + namespace org { namespace apache { namespace nifi { @@ -40,7 +42,7 @@ class FlowFileRepository; #endif std::shared_ptr<core::Repository> createRepository( - const std::string configuration_class_name, bool fail_safe) { + const std::string configuration_class_name, bool fail_safe, const std::string repo_name) { std::shared_ptr<core::Repository> return_obj = nullptr; std::string class_name_lc = configuration_class_name; std::transform(class_name_lc.begin(), class_name_lc.end(), @@ -48,9 +50,15 @@ std::shared_ptr<core::Repository> createRepository( try { std::shared_ptr<core::Repository> return_obj = nullptr; if (class_name_lc == "flowfilerepository") { - return_obj = instantiate<core::repository::FlowFileRepository>(); + std::cout << "creating flow" << std::endl; + return_obj = instantiate<core::repository::FlowFileRepository>(repo_name); } else if (class_name_lc == "provenancerepository") { - return_obj = instantiate<provenance::ProvenanceRepository>(); + return_obj = instantiate<provenance::ProvenanceRepository>(repo_name); + } else if (class_name_lc == "volatilerepository") { + return_obj = instantiate<repository::VolatileRepository>(repo_name); + } else if (class_name_lc == "nooprepository") { + std::cout << "creating noop" << std::endl; + return_obj = instantiate<core::Repository>(repo_name); } if (return_obj) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/src/core/repository/VolatileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp new file mode 100644 index 0000000..db036f8 --- /dev/null +++ b/libminifi/src/core/repository/VolatileRepository.cpp @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/repository/VolatileRepository.h" +#include <memory> +#include <string> +#include <vector> +#include "FlowFileRecord.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +const char *VolatileRepository::volatile_repo_max_count = + "max.count"; + +void VolatileRepository::run() { + repo_full_ = false; +} + +/** + * Purge + */ +void VolatileRepository::purge() { + while (current_size_ > max_size_) { + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue value; + if (ent->getValue(value)) { + current_size_ -= value.size(); + logger_->log_info("VolatileRepository -- purge %s %d %d %d", + value.getKey(), current_size_.load(), max_size_, + current_index_.load()); + } + if (current_size_ < max_size_) + break; + } + } +} + +void VolatileRepository::loadComponent() { +} + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 9dbff36..585c0d3 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -93,7 +93,7 @@ class TestRepository : public core::Repository { class TestFlowRepository : public core::repository::FlowFileRepository { public: TestFlowRepository() - : core::repository::FlowFileRepository("./dir", 1000, 100, 0) { + : core::repository::FlowFileRepository("ff","./dir", 1000, 100, 0) { } // initialize bool initialize() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/test/unit/ProvenanceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp index f5374b8..993fe58 100644 --- a/libminifi/test/unit/ProvenanceTests.cpp +++ b/libminifi/test/unit/ProvenanceTests.cpp @@ -27,6 +27,7 @@ #include "FlowFileRecord.h" #include "core/Core.h" #include "core/repository/FlowFileRepository.h" +#include "core/repository/VolatileRepository.h" TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") { provenance::ProvenanceEventRecord record1( @@ -72,7 +73,7 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe")); std::shared_ptr<core::repository::FlowFileRepository> frepo = std::make_shared<core::repository::FlowFileRepository>( - "./content_repository", 0, 0, 0); + "ff", "./content_repository", 0, 0, 0); std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared< minifi::FlowFileRecord>(frepo, attributes); @@ -93,3 +94,86 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { record2.removeChildUuid(childId); REQUIRE(record2.getChildrenUuids().size() == 0); } + +TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, + "componentid", "componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)"; + record1.setDetails(smileyface); + + uint64_t sample = 65555; + + std::shared_ptr<core::Repository> testRepository = std::make_shared< + core::repository::VolatileRepository>(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + REQUIRE(record2.DeSerialize(testRepository, eventId) == true); + REQUIRE(record2.getEventId() == record1.getEventId()); + REQUIRE(record2.getComponentId() == record1.getComponentId()); + REQUIRE(record2.getComponentType() == record1.getComponentType()); + REQUIRE(record2.getDetails() == record1.getDetails()); + REQUIRE(record2.getDetails() == smileyface); + REQUIRE(record2.getEventDuration() == sample); +} + +TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") { + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, + "componentid", "componenttype"); + std::string eventId = record1.getEventId(); + std::map<std::string, std::string> attributes; + attributes.insert(std::pair<std::string, std::string>("potato", "potatoe")); + attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe")); + std::shared_ptr<core::Repository> frepo = std::make_shared< + core::repository::VolatileRepository>(); + frepo->initialize(0); + std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared< + minifi::FlowFileRecord>(frepo, attributes); + + record1.addChildFlowFile(ffr1); + + uint64_t sample = 65555; + std::shared_ptr<core::Repository> testRepository = std::make_shared< + core::repository::VolatileRepository>(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + REQUIRE(record2.DeSerialize(testRepository, eventId) == true); + REQUIRE(record1.getChildrenUuids().size() == 1); + REQUIRE(record2.getChildrenUuids().size() == 1); + std::string childId = record2.getChildrenUuids().at(0); + REQUIRE(childId == ffr1->getUUIDStr()); + record2.removeChildUuid(childId); + REQUIRE(record2.getChildrenUuids().size() == 0); +} + +TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, + "componentid", "componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)"; + record1.setDetails(smileyface); + + uint64_t sample = 65555; + + std::shared_ptr<core::Repository> testRepository = std::make_shared< + core::Repository>(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + REQUIRE(record2.DeSerialize(testRepository, eventId) == false); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/test/unit/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp index 4b6c4ad..9d22030 100644 --- a/libminifi/test/unit/RepoTests.cpp +++ b/libminifi/test/unit/RepoTests.cpp @@ -31,7 +31,8 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { char format[] = "/tmp/testRepo.XXXXXX"; char *dir = testController.createTempDirectory(format); std::shared_ptr<core::repository::FlowFileRepository> repository = - std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1); + std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, + 1); repository->initialize(std::make_shared<minifi::Configure>()); @@ -51,7 +52,8 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { char format[] = "/tmp/testRepo.XXXXXX"; char *dir = testController.createTempDirectory(format); std::shared_ptr<core::repository::FlowFileRepository> repository = - std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1); + std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, + 1); repository->initialize(std::make_shared<minifi::Configure>()); @@ -73,7 +75,8 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { char format[] = "/tmp/testRepo.XXXXXX"; char *dir = testController.createTempDirectory(format); std::shared_ptr<core::repository::FlowFileRepository> repository = - std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1); + std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, + 1); repository->initialize( std::make_shared<org::apache::nifi::minifi::Configure>()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 3eb16ae..5944ff2 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -158,14 +158,14 @@ int main(int argc, char **argv) { prov_repo_class); // Create repos for flow record and provenance std::shared_ptr<core::Repository> prov_repo = core::createRepository( - prov_repo_class, true); + prov_repo_class, true,"provenance"); prov_repo->initialize(configure); configure->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class); std::shared_ptr<core::Repository> flow_repo = core::createRepository( - flow_repo_class, true); + flow_repo_class, true, "flowfile"); flow_repo->initialize(configure); @@ -203,6 +203,10 @@ int main(int argc, char **argv) { * Trigger unload -- wait stop_wait_time */ controller->waitUnload(stop_wait_time); + + flow_repo = nullptr; + + prov_repo = nullptr; logger->log_info("MiNiFi exit");