Repository: nifi-minifi-cpp Updated Branches: refs/heads/master d26d65af4 -> cbc12903f
MINIFI-207: Use recursive mutex that avoid thread safety concerns This closes #102. Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/cbc12903 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/cbc12903 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/cbc12903 Branch: refs/heads/master Commit: cbc12903ffb81bf16808f61a42e772d7c4676c06 Parents: d26d65a Author: Marc Parisi <phroc...@apache.org> Authored: Wed May 17 16:09:07 2017 -0400 Committer: Aldrin Piri <ald...@apache.org> Committed: Wed May 24 11:53:31 2017 -0400 ---------------------------------------------------------------------- libminifi/include/core/Core.h | 4 ++-- libminifi/include/core/ProcessGroup.h | 6 +++--- libminifi/src/core/ProcessGroup.cpp | 22 ++++++++++++---------- 3 files changed, 17 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index 335f306..3864882 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -146,8 +146,8 @@ class CoreComponent { const std::string & getUUIDStr() { return uuidStr_; } - - void loadComponent(){ + + void loadComponent() { } protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index f2f9a63..ccf744e 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -120,12 +120,12 @@ class ProcessGroup { bool isRootProcessGroup(); // set parent process group void setParent(ProcessGroup *parent) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); parent_process_group_ = parent; } // get parent process group ProcessGroup *getParent(void) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); return parent_process_group_; } // Add processor @@ -197,7 +197,7 @@ class ProcessGroup { private: // Mutex for protection - std::mutex mutex_; + std::recursive_mutex mutex_; // Logger std::shared_ptr<logging::Logger> logger_; // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/cbc12903/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 1b8ec3a..01d3dbf 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -67,12 +67,12 @@ ProcessGroup::~ProcessGroup() { } bool ProcessGroup::isRootProcessGroup() { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); return (type_ == ROOT_PROCESS_GROUP); } void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (processors_.find(processor) == processors_.end()) { // We do not have the same processor in this process group yet @@ -83,7 +83,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { } void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (processors_.find(processor) != processors_.end()) { // We do have the same processor in this process group yet @@ -94,7 +94,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { } void ProcessGroup::addProcessGroup(ProcessGroup *child) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (child_process_groups_.find(child) == child_process_groups_.end()) { // We do not have the same child process group in this process group yet @@ -105,7 +105,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) { } void ProcessGroup::removeProcessGroup(ProcessGroup *child) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (child_process_groups_.find(child) != child_process_groups_.end()) { // We do have the same child process group in this process group yet @@ -117,7 +117,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) { void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); try { // Start all the processor node, input and output ports @@ -148,7 +148,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); try { // Stop all the processor node, input and output ports @@ -176,6 +176,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, } std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { + std::lock_guard<std::recursive_mutex> lock(mutex_); std::shared_ptr<Processor> ret = NULL; for (auto processor : processors_) { logger_->log_info("find processor %s", processor->getName().c_str()); @@ -220,6 +221,7 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr std::shared_ptr<Processor> ProcessGroup::findProcessor( const std::string &processorName) { + std::lock_guard<std::recursive_mutex> lock(mutex_); std::shared_ptr<Processor> ret = NULL; for (auto processor : processors_) { logger_->log_debug("Current processor is %s", processor->getName().c_str()); @@ -238,7 +240,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor( void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); for (auto processor : processors_) { if (processor->getName() == processorName) { processor->setProperty(propertyName, propertyValue); @@ -262,7 +264,7 @@ void ProcessGroup::getConnections( } void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (connections_.find(connection) == connections_.end()) { // We do not have the same connection in this process group yet @@ -285,7 +287,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { } void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (connections_.find(connection) != connections_.end()) { // We do not have the same connection in this process group yet