Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master d26d65af4 -> cbc12903f


MINIFI-207: Use recursive mutex that avoid thread safety concerns

This closes #102.

Signed-off-by: Aldrin Piri <ald...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/cbc12903
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/cbc12903
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/cbc12903

Branch: refs/heads/master
Commit: cbc12903ffb81bf16808f61a42e772d7c4676c06
Parents: d26d65a
Author: Marc Parisi <phroc...@apache.org>
Authored: Wed May 17 16:09:07 2017 -0400
Committer: Aldrin Piri <ald...@apache.org>
Committed: Wed May 24 11:53:31 2017 -0400

----------------------------------------------------------------------
 libminifi/include/core/Core.h         |  4 ++--
 libminifi/include/core/ProcessGroup.h |  6 +++---
 libminifi/src/core/ProcessGroup.cpp   | 22 ++++++++++++----------
 3 files changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/include/core/Core.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 335f306..3864882 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -146,8 +146,8 @@ class CoreComponent {
   const std::string & getUUIDStr() {
     return uuidStr_;
   }
-  
-  void loadComponent(){
+
+  void loadComponent() {
   }
 
  protected:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h 
b/libminifi/include/core/ProcessGroup.h
index f2f9a63..ccf744e 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -120,12 +120,12 @@ class ProcessGroup {
   bool isRootProcessGroup();
   // set parent process group
   void setParent(ProcessGroup *parent) {
-    std::lock_guard<std::mutex> lock(mutex_);
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
     parent_process_group_ = parent;
   }
   // get parent process group
   ProcessGroup *getParent(void) {
-    std::lock_guard<std::mutex> lock(mutex_);
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
     return parent_process_group_;
   }
   // Add processor
@@ -197,7 +197,7 @@ class ProcessGroup {
  private:
 
   // Mutex for protection
-  std::mutex mutex_;
+  std::recursive_mutex mutex_;
   // Logger
   std::shared_ptr<logging::Logger> logger_;
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index 1b8ec3a..01d3dbf 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -67,12 +67,12 @@ ProcessGroup::~ProcessGroup() {
 }
 
 bool ProcessGroup::isRootProcessGroup() {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   return (type_ == ROOT_PROCESS_GROUP);
 }
 
 void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (processors_.find(processor) == processors_.end()) {
     // We do not have the same processor in this process group yet
@@ -83,7 +83,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> 
processor) {
 }
 
 void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (processors_.find(processor) != processors_.end()) {
     // We do have the same processor in this process group yet
@@ -94,7 +94,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> 
processor) {
 }
 
 void ProcessGroup::addProcessGroup(ProcessGroup *child) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (child_process_groups_.find(child) == child_process_groups_.end()) {
     // We do not have the same child process group in this process group yet
@@ -105,7 +105,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) {
 }
 
 void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (child_process_groups_.find(child) != child_process_groups_.end()) {
     // We do have the same child process group in this process group yet
@@ -117,7 +117,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
 
 void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
                                    EventDrivenSchedulingAgent *eventScheduler) 
{
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
     // Start all the processor node, input and output ports
@@ -148,7 +148,7 @@ void 
ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler,
 
 void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
                                   EventDrivenSchedulingAgent *eventScheduler) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   try {
     // Stop all the processor node, input and output ports
@@ -176,6 +176,7 @@ void 
ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler,
 }
 
 std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   std::shared_ptr<Processor> ret = NULL;
   for (auto processor : processors_) {
     logger_->log_info("find processor %s", processor->getName().c_str());
@@ -220,6 +221,7 @@ std::shared_ptr<core::controller::ControllerServiceNode> 
ProcessGroup::findContr
 
 std::shared_ptr<Processor> ProcessGroup::findProcessor(
     const std::string &processorName) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   std::shared_ptr<Processor> ret = NULL;
   for (auto processor : processors_) {
     logger_->log_debug("Current processor is %s", 
processor->getName().c_str());
@@ -238,7 +240,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(
 void ProcessGroup::updatePropertyValue(std::string processorName,
                                        std::string propertyName,
                                        std::string propertyValue) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   for (auto processor : processors_) {
     if (processor->getName() == processorName) {
       processor->setProperty(propertyName, propertyValue);
@@ -262,7 +264,7 @@ void ProcessGroup::getConnections(
 }
 
 void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (connections_.find(connection) == connections_.end()) {
     // We do not have the same connection in this process group yet
@@ -285,7 +287,7 @@ void 
ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
 }
 
 void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
 
   if (connections_.find(connection) != connections_.end()) {
     // We do not have the same connection in this process group yet

Reply via email to