Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 9f98a407c -> d26d65af4


MINIFI-37: Create a volatile repository and config items for
NoOp and Volatile repository configuration

Allow the max count of the volatile repository size to be configurable

This closes #98.

Signed-off-by: Aldrin Piri <ald...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/d26d65af
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/d26d65af
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/d26d65af

Branch: refs/heads/master
Commit: d26d65af4448ddb054975601a9bf59f8660e6a8d
Parents: 9f98a40
Author: Marc Parisi <phroc...@apache.org>
Authored: Fri May 12 15:22:06 2017 -0400
Committer: Aldrin Piri <ald...@apache.org>
Committed: Wed May 24 09:23:39 2017 -0400

----------------------------------------------------------------------
 README.md                                       |  12 +
 libminifi/include/core/Core.h                   |  15 +-
 libminifi/include/core/Repository.h             |  15 +-
 libminifi/include/core/RepositoryFactory.h      |   3 +-
 libminifi/include/core/logging/BaseLogger.h     |   3 +-
 .../core/repository/FlowFileRepository.h        |  12 +-
 .../core/repository/VolatileRepository.h        | 372 +++++++++++++++++++
 libminifi/include/properties/Configure.h        |   1 +
 .../include/provenance/ProvenanceRepository.h   |   4 +-
 libminifi/src/Configure.cpp                     |   2 +
 libminifi/src/FlowController.cpp                |   6 +-
 libminifi/src/core/RepositoryFactory.cpp        |  14 +-
 .../src/core/repository/VolatileRepository.cpp  |  66 ++++
 libminifi/test/unit/ProvenanceTestHelper.h      |   2 +-
 libminifi/test/unit/ProvenanceTests.cpp         |  86 ++++-
 libminifi/test/unit/RepoTests.cpp               |   9 +-
 main/MiNiFiMain.cpp                             |   8 +-
 17 files changed, 597 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index df89fb8..1457350 100644
--- a/README.md
+++ b/README.md
@@ -299,7 +299,19 @@ Additionally, users can utilize the MiNiFi Toolkit 
Converter (version 0.0.1 - sc
 
     if you do not want to enable client certificate base authorization
     nifi.security.need.ClientAuth=false
+    
+### Configuring Volatile and NO-OP Repositories
 
+     in minifi.properties 
+     
+     # For Volatile Repositories:
+     nifi.flow.repository.class.name=VolatileRepository
+     nifi.provenance.repository.class.name=VolatileRepository
+     
+     # For NO-OP Repositories:
+        nifi.flow.repository.class.name=NoOpRepository
+     nifi.provenance.repository.class.name=NoOpRepository
+     
 ### Provenance Report
 
     Add Provenance Reporting to config.yml

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/Core.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 453a6a5..335f306 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -40,8 +40,10 @@ namespace core {
 template<typename T>
 static inline std::string getClassName() {
   char *b = abi::__cxa_demangle(typeid(T).name(), 0, 0, 0);
+  if (b == nullptr)
+    return std::string();
   std::string name = b;
-  delete[] b;
+  std::free(b);
   return name;
 }
 
@@ -64,13 +66,18 @@ struct class_operations {
 };
 
 template<typename T>
-typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type 
instantiate() {
+typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type 
instantiate(const std::string name = "") {
   throw std::runtime_error("Cannot instantiate class");
 }
 
 template<typename T>
-typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type 
instantiate() {
-  return std::make_shared<T>();
+typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type 
instantiate(const std::string name = "") {
+  if (name.length() == 0){
+    return std::make_shared<T>();
+  }
+  else{
+    return std::make_shared<T>(name);
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index 48ccc47..94517d8 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -47,14 +47,21 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
+#define REPOSITORY_DIRECTORY "./repo"
+#define MAX_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
+#define REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
+
 class Repository : public CoreComponent {
  public:
   /*
    * Constructor for the repository
    */
-  Repository(std::string repo_name, std::string directory,
-             int64_t maxPartitionMillis, int64_t maxPartitionBytes,
-             uint64_t purgePeriod)
+  Repository(std::string repo_name="Repository",
+             std::string directory = REPOSITORY_DIRECTORY,
+             int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
+             int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
+             uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
       : CoreComponent(repo_name),
         thread_() {
     directory_ = directory;
@@ -84,7 +91,7 @@ class Repository : public CoreComponent {
   }
 
   virtual bool Get(std::string key, std::string &value) {
-    return true;
+    return false;
   }
 
   // Run function for the thread

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/RepositoryFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/RepositoryFactory.h 
b/libminifi/include/core/RepositoryFactory.h
index ed9a026..db474a0 100644
--- a/libminifi/include/core/RepositoryFactory.h
+++ b/libminifi/include/core/RepositoryFactory.h
@@ -20,6 +20,7 @@
 #define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
 
 #include "core/Repository.h"
+#include "core/repository/VolatileRepository.h"
 #include "Core.h"
 
 namespace org {
@@ -30,7 +31,7 @@ namespace minifi {
 namespace core {
 
 std::shared_ptr<core::Repository> createRepository(
-    const std::string configuration_class_name, bool fail_safe = false);
+    const std::string configuration_class_name, bool fail_safe = false,const 
std::string repo_name = "");
 
 } /* namespace core */
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/logging/BaseLogger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/BaseLogger.h 
b/libminifi/include/core/logging/BaseLogger.h
index 9d00fb6..904bac2 100644
--- a/libminifi/include/core/logging/BaseLogger.h
+++ b/libminifi/include/core/logging/BaseLogger.h
@@ -54,7 +54,8 @@ typedef enum {
 
 template<typename ... Args>
 inline std::string format_string(char const* format_str, Args&&... args) {
-  char buf[LOG_BUFFER_SIZE];
+  char buf[LOG_BUFFER_SIZE+1] = {0};
+  
   std::snprintf(buf, LOG_BUFFER_SIZE, format_str, args...);
   return std::string(buf);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/repository/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FlowFileRepository.h 
b/libminifi/include/core/repository/FlowFileRepository.h
index 9fc13e0..051dfb0 100644
--- a/libminifi/include/core/repository/FlowFileRepository.h
+++ b/libminifi/include/core/repository/FlowFileRepository.h
@@ -47,21 +47,15 @@ class FlowFileRepository : public core::Repository,
  public:
   // Constructor
 
-  FlowFileRepository(std::string directory, int64_t maxPartitionMillis,
-                     int64_t maxPartitionBytes, uint64_t purgePeriod)
-      : Repository(core::getClassName<FlowFileRepository>(), directory,
+  FlowFileRepository(const std::string repo_name = "", std::string 
directory=FLOWFILE_REPOSITORY_DIRECTORY, int64_t 
maxPartitionMillis=MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                     int64_t 
maxPartitionBytes=MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t 
purgePeriod=FLOWFILE_REPOSITORY_PURGE_PERIOD)
+      : Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<FlowFileRepository>(), directory,
                    maxPartitionMillis, maxPartitionBytes, purgePeriod)
 
   {
     db_ = NULL;
   }
 
-  FlowFileRepository()
-      : FlowFileRepository(FLOWFILE_REPOSITORY_DIRECTORY,
-      MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
-                           MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
-                           FLOWFILE_REPOSITORY_PURGE_PERIOD) {
-  }
 
   // Destructor
   ~FlowFileRepository() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/core/repository/VolatileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileRepository.h 
b/libminifi/include/core/repository/VolatileRepository.h
new file mode 100644
index 0000000..1e07e28
--- /dev/null
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -0,0 +1,372 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_
+
+#include "core/Repository.h"
+#include <chrono>
+#include <vector>
+#include <map>
+#include "core/Core.h"
+#include "Connection.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+static uint16_t accounting_size = sizeof(std::vector<uint8_t>)
+    + sizeof(std::string) + sizeof(size_t);
+
+class RepoValue {
+ public:
+
+  explicit RepoValue() {
+  }
+
+  explicit RepoValue(std::string key, uint8_t *ptr, size_t size)
+      : key_(key) {
+    buffer_.resize(size);
+    std::memcpy(buffer_.data(), ptr, size);
+    fast_size_ = key.size() + size;
+  }
+
+  explicit RepoValue(RepoValue &&other)
+noexcept      : key_(std::move(other.key_)),
+      buffer_(std::move(other.buffer_)),
+      fast_size_(other.fast_size_) {
+      }
+
+      ~RepoValue()
+      {
+      }
+
+      std::string &getKey() {
+        return key_;
+      }
+
+      /**
+       * Return the size of the memory within the key
+       * buffer, the size of timestamp, and the general
+       * system word size
+       */
+      uint64_t size() {
+        return fast_size_;
+      }
+
+      size_t bufferSize() {
+        return buffer_.size();
+      }
+
+      void emplace(std::string &str) {
+        str.insert(0, reinterpret_cast<const char*>(buffer_.data()), 
buffer_.size());
+      }
+
+      RepoValue &operator=(RepoValue &&other) noexcept {
+        key_ = std::move(other.key_);
+        buffer_ = std::move(other.buffer_);
+        other.buffer_.clear();
+        return *this;
+      }
+
+    private:
+      size_t fast_size_;
+      std::string key_;
+      std::vector<uint8_t> buffer_;
+    };
+
+    /**
+     * Purpose: Atomic Entry allows us to create a statically
+     * sized ring buffer, with the ability to create
+     **/
+class AtomicEntry {
+
+ public:
+  AtomicEntry()
+      : write_pending_(false),
+        has_value_(false) {
+
+  }
+
+  bool setRepoValue(RepoValue &new_value, size_t &prev_size) {
+    // delete the underlying pointer
+    bool lock = false;
+    if (!write_pending_.compare_exchange_weak(lock, true) && !lock)
+      return false;
+    if (has_value_) {
+      prev_size = value_.size();
+    }
+    value_ = std::move(new_value);
+    has_value_ = true;
+    try_unlock();
+    return true;
+  }
+
+  bool getValue(RepoValue &value) {
+    try_lock();
+    if (!has_value_) {
+      try_unlock();
+      return false;
+    }
+    value = std::move(value_);
+    has_value_ = false;
+    try_unlock();
+    return true;
+  }
+
+  bool getValue(const std::string &key, RepoValue &value) {
+    try_lock();
+    if (!has_value_) {
+      try_unlock();
+      return false;
+    }
+    if (value_.getKey() != key) {
+      try_unlock();
+      return false;
+    }
+    value = std::move(value_);
+    has_value_ = false;
+    try_unlock();
+    return true;
+  }
+
+ private:
+
+  inline void try_lock() {
+    bool lock = false;
+    while (!write_pending_.compare_exchange_weak(lock, true) && !lock) {
+      // attempt again
+    }
+  }
+
+  inline void try_unlock() {
+    bool lock = true;
+    while (!write_pending_.compare_exchange_weak(lock, false) && lock) {
+      // attempt again
+    }
+  }
+
+  std::atomic<bool> write_pending_;
+  std::atomic<bool> has_value_;
+  RepoValue value_;
+};
+
+/**
+ * Flow File repository
+ * Design: Extends Repository and implements the run function, using LevelDB 
as the primary substrate.
+ */
+class VolatileRepository : public core::Repository,
+    public std::enable_shared_from_this<VolatileRepository> {
+ public:
+
+  static const char *volatile_repo_max_count;
+  // Constructor
+
+  VolatileRepository(
+      std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY,
+      int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
+      int64_t maxPartitionBytes =
+      MAX_REPOSITORY_STORAGE_SIZE,
+      uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
+      : Repository(
+            repo_name.length() > 0 ?
+                repo_name : core::getClassName<VolatileRepository>(),
+            "", maxPartitionMillis, maxPartitionBytes, purgePeriod),
+        max_size_(maxPartitionBytes * 0.75),
+        current_index_(0),
+        max_count_(10000)
+
+  {
+
+  }
+
+  // Destructor
+  ~VolatileRepository() {
+    for (auto ent : value_vector_) {
+      delete ent;
+    }
+  }
+
+  /**
+   * Initialize thevolatile repsitory
+   **/
+  virtual bool initialize(const std::shared_ptr<Configure> &configure) {
+    std::string value = "";
+
+    if (configure != nullptr) {
+      int64_t max_cnt = 0;
+      std::stringstream strstream;
+      strstream << Configure::nifi_volatile_repository_options << getName()
+                << "." << volatile_repo_max_count;
+      if (configure->get(strstream.str(), value)) {
+        if (core::Property::StringToInt(value, max_cnt)) {
+          max_count_ = max_cnt;
+        }
+
+      }
+    }
+
+    logger_->log_debug("Resizing value_vector_ for %s count is %d", getName(),
+                       max_count_);
+    value_vector_.reserve(max_count_);
+    for (int i = 0; i < max_count_; i++) {
+      value_vector_.emplace_back(new AtomicEntry());
+    }
+    return true;
+  }
+
+  virtual void run();
+
+  /**
+   * Places a new object into the volatile memory area
+   * @param key key to add to the repository
+   * @param buf buffer 
+   **/
+  virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
+    RepoValue new_value(key, buf, bufLen);
+
+    const size_t size = new_value.size();
+    bool updated = false;
+    size_t reclaimed_size = 0;
+    do {
+
+      int private_index = current_index_.fetch_add(1);
+      // round robin through the beginning
+      if (private_index >= max_count_) {
+        uint16_t new_index = 0;
+        if (current_index_.compare_exchange_weak(new_index, 0)) {
+          private_index = 0;
+        } else {
+          continue;
+        }
+      }
+      logger_->log_info("Set repo value at %d out of %d", private_index,
+                        max_count_);
+      updated = value_vector_.at(private_index)->setRepoValue(new_value,
+                                                              reclaimed_size);
+
+      if (reclaimed_size > 0) {
+        current_size_ -= reclaimed_size;
+      }
+
+    } while (!updated);
+    current_size_ += size;
+
+    logger_->log_info("VolatileRepository -- put %s %d %d", key,
+                      current_size_.load(), current_index_.load());
+    return true;
+  }
+  /**
+   *c
+   * Deletes the key
+   * @return status of the delete operation
+   */
+  virtual bool Delete(std::string key) {
+
+    logger_->log_info("VolatileRepository -- delete %s", key);
+    for (auto ent : value_vector_) {
+      // let the destructor do the cleanup
+      RepoValue value;
+      if (ent->getValue(key, value)) {
+        current_size_ -= value.size();
+        return true;
+      }
+
+    }
+    return false;
+  }
+  /**
+   * Sets the value from the provided key. Once the item is retrieved
+   * it may not be retrieved again.
+   * @return status of the get operation.
+   */
+  virtual bool Get(std::string key, std::string &value) {
+    for (auto ent : value_vector_) {
+      // let the destructor do the cleanup
+      RepoValue repo_value;
+
+      if (ent->getValue(key, repo_value)) {
+        current_size_ -= value.size();
+        repo_value.emplace(value);
+        logger_->log_info("VolatileRepository -- get %s %d", key,
+                          current_size_.load());
+        return true;
+      }
+
+    }
+    return false;
+  }
+
+  void setConnectionMap(
+      std::map<std::string, std::shared_ptr<minifi::Connection>> 
&connectionMap) {
+    this->connectionMap = connectionMap;
+  }
+  void loadComponent();
+
+  void start() {
+    if (this->purge_period_ <= 0)
+      return;
+    if (running_)
+      return;
+    thread_ = std::thread(&VolatileRepository::run, shared_from_this());
+    thread_.detach();
+    running_ = true;
+    logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
+  }
+
+ protected:
+
+  /**
+   * Tests whether or not the current size exceeds the capacity
+   * if the new prospectiveSize is inserted.
+   * @param prospectiveSize size of item to be added.
+   */
+  inline bool exceedsCapacity(uint32_t prospectiveSize) {
+    if (current_size_ + prospectiveSize > max_size_)
+      return true;
+    else
+      return false;
+  }
+  /**
+   * Purges the volatile repository.
+   */
+  void purge();
+
+ private:
+  std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
+
+  std::atomic<uint32_t> current_size_;
+  std::atomic<uint16_t> current_index_;
+  std::vector<AtomicEntry*> value_vector_;
+  uint32_t max_count_;
+  uint32_t max_size_;
+
+}
+;
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/properties/Configure.h
index 4119edd..2d5f293 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -49,6 +49,7 @@ class Configure {
   static const char *nifi_server_name;
   static const char *nifi_configuration_class_name;
   static const char *nifi_flow_repository_class_name;
+  static const char *nifi_volatile_repository_options;
   static const char *nifi_provenance_repository_class_name;
   static const char *nifi_server_port;
   static const char *nifi_server_report_interval;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h 
b/libminifi/include/provenance/ProvenanceRepository.h
index af613a5..9e055f9 100644
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -42,12 +42,12 @@ class ProvenanceRepository : public core::Repository,
   /*!
    * Create a new provenance repository
    */
-  ProvenanceRepository(std::string directory = PROVENANCE_DIRECTORY,
+  ProvenanceRepository(const std::string repo_name = "", std::string directory 
= PROVENANCE_DIRECTORY,
                        int64_t maxPartitionMillis =
                        MAX_PROVENANCE_ENTRY_LIFE_TIME,
                        int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
                        uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
-      : Repository(core::getClassName<ProvenanceRepository>(), directory,
+      : Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<ProvenanceRepository>(), directory,
                    maxPartitionMillis, maxPartitionBytes, purgePeriod) {
 
     db_ = NULL;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index f35e88a..c1524a2 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -41,6 +41,8 @@ const char *Configure::nifi_configuration_class_name =
     "nifi.flow.configuration.class.name";
 const char *Configure::nifi_flow_repository_class_name =
     "nifi.flow.repository.class.name";
+const char *Configure::nifi_volatile_repository_options =
+    "nifi.volatile.repository.options.";
 const char *Configure::nifi_provenance_repository_class_name =
     "nifi.provenance.repository.class.name";
 const char *Configure::nifi_server_port = "nifi.server.port";

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 5f6e014..c87875d 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -297,9 +297,11 @@ void FlowController::loadFlowRepo() {
     }
     logger_->log_debug("Number of connections from connectionMap %d",
                        connectionMap.size());
-    auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>(
+    auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(
         flow_file_repo_);
-    rep->setConnectionMap(connectionMap);
+    if (nullptr != rep) {
+      rep->setConnectionMap(connectionMap);
+    }
     flow_file_repo_->loadComponent();
   } else {
     logger_->log_debug("Flow file repository is not set");

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp 
b/libminifi/src/core/RepositoryFactory.cpp
index c24a2af..45ad980 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -24,6 +24,8 @@
 #include "provenance/ProvenanceRepository.h"
 #endif
 
+#include "core/repository/VolatileRepository.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -40,7 +42,7 @@ class FlowFileRepository;
 #endif
 
 std::shared_ptr<core::Repository> createRepository(
-    const std::string configuration_class_name, bool fail_safe) {
+    const std::string configuration_class_name, bool fail_safe, const 
std::string repo_name) {
   std::shared_ptr<core::Repository> return_obj = nullptr;
   std::string class_name_lc = configuration_class_name;
   std::transform(class_name_lc.begin(), class_name_lc.end(),
@@ -48,9 +50,15 @@ std::shared_ptr<core::Repository> createRepository(
   try {
     std::shared_ptr<core::Repository> return_obj = nullptr;
     if (class_name_lc == "flowfilerepository") {
-      return_obj = instantiate<core::repository::FlowFileRepository>();
+      std::cout << "creating flow" << std::endl;
+      return_obj = 
instantiate<core::repository::FlowFileRepository>(repo_name);
     } else if (class_name_lc == "provenancerepository") {
-      return_obj = instantiate<provenance::ProvenanceRepository>();
+      return_obj = instantiate<provenance::ProvenanceRepository>(repo_name);
+    } else if (class_name_lc == "volatilerepository") {
+      return_obj = instantiate<repository::VolatileRepository>(repo_name);
+    } else if (class_name_lc == "nooprepository") {
+      std::cout << "creating noop" << std::endl;
+      return_obj = instantiate<core::Repository>(repo_name);
     }
 
     if (return_obj) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/src/core/repository/VolatileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp 
b/libminifi/src/core/repository/VolatileRepository.cpp
new file mode 100644
index 0000000..db036f8
--- /dev/null
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/repository/VolatileRepository.h"
+#include <memory>
+#include <string>
+#include <vector>
+#include "FlowFileRecord.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+const char *VolatileRepository::volatile_repo_max_count =
+    "max.count";
+
+void VolatileRepository::run() {
+  repo_full_ = false;
+}
+
+/**
+ * Purge
+ */
+void VolatileRepository::purge() {
+  while (current_size_ > max_size_) {
+    for (auto ent : value_vector_) {
+      // let the destructor do the cleanup
+      RepoValue value;
+      if (ent->getValue(value)) {
+        current_size_ -= value.size();
+        logger_->log_info("VolatileRepository -- purge %s %d %d %d",
+                          value.getKey(), current_size_.load(), max_size_,
+                          current_index_.load());
+      }
+      if (current_size_ < max_size_)
+        break;
+    }
+  }
+}
+
+void VolatileRepository::loadComponent() {
+}
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index 9dbff36..585c0d3 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -93,7 +93,7 @@ class TestRepository : public core::Repository {
 class TestFlowRepository : public core::repository::FlowFileRepository {
  public:
   TestFlowRepository()
-      : core::repository::FlowFileRepository("./dir", 1000, 100, 0) {
+      : core::repository::FlowFileRepository("ff","./dir", 1000, 100, 0) {
   }
   // initialize
   bool initialize() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/test/unit/ProvenanceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTests.cpp 
b/libminifi/test/unit/ProvenanceTests.cpp
index f5374b8..993fe58 100644
--- a/libminifi/test/unit/ProvenanceTests.cpp
+++ b/libminifi/test/unit/ProvenanceTests.cpp
@@ -27,6 +27,7 @@
 #include "FlowFileRecord.h"
 #include "core/Core.h"
 #include "core/repository/FlowFileRepository.h"
+#include "core/repository/VolatileRepository.h"
 
 TEST_CASE("Test Provenance record create", 
"[Testprovenance::ProvenanceEventRecord]") {
   provenance::ProvenanceEventRecord record1(
@@ -72,7 +73,7 @@ TEST_CASE("Test Flowfile record added to provenance", 
"[TestFlowAndProv1]") {
   attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
   std::shared_ptr<core::repository::FlowFileRepository> frepo =
       std::make_shared<core::repository::FlowFileRepository>(
-          "./content_repository", 0, 0, 0);
+          "ff", "./content_repository", 0, 0, 0);
   std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<
       minifi::FlowFileRecord>(frepo, attributes);
 
@@ -93,3 +94,86 @@ TEST_CASE("Test Flowfile record added to provenance", 
"[TestFlowAndProv1]") {
   record2.removeChildUuid(childId);
   REQUIRE(record2.getChildrenUuids().size() == 0);
 }
+
+TEST_CASE("Test Provenance record serialization Volatile", 
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
+  provenance::ProvenanceEventRecord record1(
+      provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE,
+      "componentid", "componenttype");
+
+  std::string eventId = record1.getEventId();
+
+  std::string smileyface = ":)";
+  record1.setDetails(smileyface);
+
+  uint64_t sample = 65555;
+
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<
+      core::repository::VolatileRepository>();
+  testRepository->initialize(0);
+  record1.setEventDuration(sample);
+
+  record1.Serialize(testRepository);
+  provenance::ProvenanceEventRecord record2;
+  REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+  REQUIRE(record2.getEventId() == record1.getEventId());
+  REQUIRE(record2.getComponentId() == record1.getComponentId());
+  REQUIRE(record2.getComponentType() == record1.getComponentType());
+  REQUIRE(record2.getDetails() == record1.getDetails());
+  REQUIRE(record2.getDetails() == smileyface);
+  REQUIRE(record2.getEventDuration() == sample);
+}
+
+TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", 
"[TestFlowAndProv1]") {
+  provenance::ProvenanceEventRecord record1(
+      provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE,
+      "componentid", "componenttype");
+  std::string eventId = record1.getEventId();
+  std::map<std::string, std::string> attributes;
+  attributes.insert(std::pair<std::string, std::string>("potato", "potatoe"));
+  attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe"));
+  std::shared_ptr<core::Repository> frepo = std::make_shared<
+      core::repository::VolatileRepository>();
+  frepo->initialize(0);
+  std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<
+      minifi::FlowFileRecord>(frepo, attributes);
+
+  record1.addChildFlowFile(ffr1);
+
+  uint64_t sample = 65555;
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<
+      core::repository::VolatileRepository>();
+  testRepository->initialize(0);
+  record1.setEventDuration(sample);
+
+  record1.Serialize(testRepository);
+  provenance::ProvenanceEventRecord record2;
+  REQUIRE(record2.DeSerialize(testRepository, eventId) == true);
+  REQUIRE(record1.getChildrenUuids().size() == 1);
+  REQUIRE(record2.getChildrenUuids().size() == 1);
+  std::string childId = record2.getChildrenUuids().at(0);
+  REQUIRE(childId == ffr1->getUUIDStr());
+  record2.removeChildUuid(childId);
+  REQUIRE(record2.getChildrenUuids().size() == 0);
+}
+
+TEST_CASE("Test Provenance record serialization NoOp", 
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
+  provenance::ProvenanceEventRecord record1(
+      provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE,
+      "componentid", "componenttype");
+
+  std::string eventId = record1.getEventId();
+
+  std::string smileyface = ":)";
+  record1.setDetails(smileyface);
+
+  uint64_t sample = 65555;
+
+  std::shared_ptr<core::Repository> testRepository = std::make_shared<
+      core::Repository>();
+  testRepository->initialize(0);
+  record1.setEventDuration(sample);
+
+  record1.Serialize(testRepository);
+  provenance::ProvenanceEventRecord record2;
+  REQUIRE(record2.DeSerialize(testRepository, eventId) == false);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/libminifi/test/unit/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/RepoTests.cpp 
b/libminifi/test/unit/RepoTests.cpp
index 4b6c4ad..9d22030 100644
--- a/libminifi/test/unit/RepoTests.cpp
+++ b/libminifi/test/unit/RepoTests.cpp
@@ -31,7 +31,8 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
   char format[] = "/tmp/testRepo.XXXXXX";
   char *dir = testController.createTempDirectory(format);
   std::shared_ptr<core::repository::FlowFileRepository> repository =
-      std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1);
+      std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0,
+                                                             1);
 
   repository->initialize(std::make_shared<minifi::Configure>());
 
@@ -51,7 +52,8 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
   char format[] = "/tmp/testRepo.XXXXXX";
   char *dir = testController.createTempDirectory(format);
   std::shared_ptr<core::repository::FlowFileRepository> repository =
-      std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1);
+      std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0,
+                                                             1);
 
   repository->initialize(std::make_shared<minifi::Configure>());
 
@@ -73,7 +75,8 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
   char format[] = "/tmp/testRepo.XXXXXX";
   char *dir = testController.createTempDirectory(format);
   std::shared_ptr<core::repository::FlowFileRepository> repository =
-      std::make_shared<core::repository::FlowFileRepository>(dir, 0, 0, 1);
+      std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0,
+                                                             1);
 
   repository->initialize(
       std::make_shared<org::apache::nifi::minifi::Configure>());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d26d65af/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 3eb16ae..5944ff2 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -158,14 +158,14 @@ int main(int argc, char **argv) {
                  prov_repo_class);
   // Create repos for flow record and provenance
   std::shared_ptr<core::Repository> prov_repo = core::createRepository(
-      prov_repo_class, true);
+      prov_repo_class, true,"provenance");
   prov_repo->initialize(configure);
 
   configure->get(minifi::Configure::nifi_flow_repository_class_name,
                  flow_repo_class);
 
   std::shared_ptr<core::Repository> flow_repo = core::createRepository(
-      flow_repo_class, true);
+      flow_repo_class, true, "flowfile");
 
   flow_repo->initialize(configure);
 
@@ -203,6 +203,10 @@ int main(int argc, char **argv) {
    * Trigger unload -- wait stop_wait_time
    */
   controller->waitUnload(stop_wait_time);
+  
+  flow_repo = nullptr;
+  
+  prov_repo = nullptr;
 
   logger->log_info("MiNiFi exit");
 

Reply via email to