Changes have been pushed for the project "Fawkes Robotics Software Framework".
Gitweb: http://git.fawkesrobotics.org/fawkes.git Trac: http://trac.fawkesrobotics.org The branch, thofmann/syncpoint has been updated to b8a77f3cd32f2d804362ee7eecd7d43506c145f8 (commit) via b19db8bfd7f954a223412c353b01f75a0340937a (commit) from fa732dd7c821da90d2240f2f40e40565cfe5dac8 (commit) http://git.fawkesrobotics.org/fawkes.git/thofmann/syncpoint Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below. - *Log* --------------------------------------------------------------- commit b19db8bfd7f954a223412c353b01f75a0340937a Author: Till Hofmann <hofm...@kbsg.rwth-aachen.de> AuthorDate: Sun Jan 25 02:27:48 2015 +0100 Commit: Till Hofmann <hofm...@kbsg.rwth-aachen.de> CommitDate: Mon Jan 26 17:22:22 2015 +0100 syncpoint: add SyncBarriers SyncBarriers are similar to SyncPoints but serve a different purpose: Instead of a single emitter, multiple emitters can register for a SyncBarrier. Only if all registered emitters have emitted the barrier a waiter can continue. Also, if the barrier has already been emitted, a waiter does not block at all but instead wait returns immediately. The manager's reset_syncbarriers() method needs to be called every iteration. It resets every barrier such that all registered emmiters are again pending, i.e. waiters will block until the barrier is emitted again. http://git.fawkesrobotics.org/fawkes.git/commit/b19db8b http://trac.fawkesrobotics.org/changeset/b19db8b - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - commit b8a77f3cd32f2d804362ee7eecd7d43506c145f8 Author: Till Hofmann <hofm...@kbsg.rwth-aachen.de> AuthorDate: Mon Jan 26 16:50:14 2015 +0100 Commit: Till Hofmann <hofm...@kbsg.rwth-aachen.de> CommitDate: Mon Jan 26 17:22:22 2015 +0100 syncpoint: add SyncBarrier tests http://git.fawkesrobotics.org/fawkes.git/commit/b8a77f3 http://trac.fawkesrobotics.org/changeset/b8a77f3 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - *Summary* ----------------------------------------------------------- src/libs/syncpoint/exceptions.h | 37 ++++ src/libs/syncpoint/syncbarrier.cpp | 159 ++++++++++++++++ src/libs/syncpoint/syncbarrier.h | 65 +++++++ src/libs/syncpoint/syncpoint.cpp | 3 + src/libs/syncpoint/syncpoint.h | 14 +- src/libs/syncpoint/syncpoint_manager.cpp | 83 +++++++++ src/libs/syncpoint/syncpoint_manager.h | 14 ++- src/libs/syncpoint/tests/test_syncpoint.cpp | 260 +++++++++++++++++++++++++++ 8 files changed, 628 insertions(+), 7 deletions(-) create mode 100644 src/libs/syncpoint/syncbarrier.cpp create mode 100644 src/libs/syncpoint/syncbarrier.h - *Diffs* ------------------------------------------------------------- - *commit* b19db8bfd7f954a223412c353b01f75a0340937a - - - - - - - - - - Author: Till Hofmann <hofm...@kbsg.rwth-aachen.de> Date: Sun Jan 25 02:27:48 2015 +0100 Subject: syncpoint: add SyncBarriers src/libs/syncpoint/exceptions.h | 37 +++++++ src/libs/syncpoint/syncbarrier.cpp | 159 ++++++++++++++++++++++++++++++ src/libs/syncpoint/syncbarrier.h | 65 ++++++++++++ src/libs/syncpoint/syncpoint.cpp | 3 + src/libs/syncpoint/syncpoint.h | 14 ++- src/libs/syncpoint/syncpoint_manager.cpp | 83 ++++++++++++++++ src/libs/syncpoint/syncpoint_manager.h | 14 ++- 7 files changed, 368 insertions(+), 7 deletions(-) _Diff for modified files_: diff --git a/src/libs/syncpoint/exceptions.h b/src/libs/syncpoint/exceptions.h index a1bf769..6d69606 100644 --- a/src/libs/syncpoint/exceptions.h +++ b/src/libs/syncpoint/exceptions.h @@ -171,6 +171,43 @@ class SyncPointMultipleWaitCallsException : public Exception } }; +/** A component tried to register as emitter but is already registered + * + */ +class SyncBarrierMultipleRegisterCallsException : public Exception +{ +public: + /** Constructor. + * @param component The calling component + * @param identifier The identifier of the SyncBarrier + */ + SyncBarrierMultipleRegisterCallsException(const char * component, + const char * identifier) + { + append("Component '%s' called register_emitter() on SyncBarrier '%s', but is already registered", + component, identifier); + } +}; + +/** Emit was called on a SyncBarrier but the calling component is not registered + * as emitter + */ +class SyncBarrierNonEmitterCalledEmitException : public Exception +{ + public: + /** Constructor. + * @param component The calling component + * @param identifier The identifier of the SyncPoint + */ + SyncBarrierNonEmitterCalledEmitException(const char * component, + const char *identifier) + { + append("Component '%s' called emit for SyncBarrier '%s', " + "but is not a registered emitter", + component, identifier); + } +}; + } // namespace fawkes diff --git a/src/libs/syncpoint/syncpoint.cpp b/src/libs/syncpoint/syncpoint.cpp index a25d3ac..e945de7 100644 --- a/src/libs/syncpoint/syncpoint.cpp +++ b/src/libs/syncpoint/syncpoint.cpp @@ -154,6 +154,9 @@ SyncPoint::wait(const std::string & component) { /** Add a watcher to the watch list * @param watcher the new watcher + * @return A pair, of which the first element is an iterator that points + * to the possibly inserted element, and the second is a bool + * that is true if the element was actually inserted. */ pair<set<string>::iterator,bool> SyncPoint::add_watcher(string watcher) diff --git a/src/libs/syncpoint/syncpoint.h b/src/libs/syncpoint/syncpoint.h index fb54f05..813377c 100644 --- a/src/libs/syncpoint/syncpoint.h +++ b/src/libs/syncpoint/syncpoint.h @@ -23,7 +23,6 @@ #define __SYNCPOINT_SYNCPOINT_H_ #include <interface/interface.h> -#include <syncpoint/syncpoint_manager.h> #include <syncpoint/syncpoint_call.h> #include <core/threading/mutex.h> #include <core/threading/wait_condition.h> @@ -40,7 +39,6 @@ namespace fawkes { #endif class SyncPointManager; -class SyncPointCall; class SyncPoint @@ -69,19 +67,27 @@ class SyncPoint */ friend class SyncPointManager; - private: + protected: std::pair<std::set<std::string>::iterator,bool> add_watcher(std::string watcher); - private: + protected: + /** The unique identifier of the SyncPoint */ const std::string identifier_; + /** Set of all components which use this SyncPoint */ std::set<std::string> watchers_; + /** Set of all components which are currently waiting for the SyncPoint */ std::set<std::string> waiting_watchers_; + /** A buffer of the most recent emit calls. */ CircularBuffer<SyncPointCall> emit_calls_; + /** A buffer of the most recent wait calls. */ CircularBuffer<SyncPointCall> wait_calls_; + /** Time when this SyncPoint was created */ const Time creation_time_; + /** Mutex used to protect all member variables */ Mutex *mutex_; + /** WaitCondition which is used for wait() and emit() */ WaitCondition *wait_condition_; }; diff --git a/src/libs/syncpoint/syncpoint_manager.cpp b/src/libs/syncpoint/syncpoint_manager.cpp index a06ef48..8ae57a8 100644 --- a/src/libs/syncpoint/syncpoint_manager.cpp +++ b/src/libs/syncpoint/syncpoint_manager.cpp @@ -75,6 +75,7 @@ SyncPointManager::get_syncpoint(const std::string & component, const std::string // insert a new SyncPoint if no SyncPoint with the same identifier exists, // otherwise, use that SyncPoint std::pair<std::set<RefPtr<SyncPoint> >::iterator,bool> ret; + // TODO clean this up we don't need to catch the exception try { ret = syncpoints_.insert(RefPtr<SyncPoint>(new SyncPoint(identifier))); } catch (const SyncPointInvalidIdentifierException &e) { @@ -94,6 +95,40 @@ SyncPointManager::get_syncpoint(const std::string & component, const std::string } /** + * Get a SyncBarrier. This allows accessing the SyncBarrier's wait() and emit() methods + * @param component The name of the component calling the method + * @param identifier The identifier of the requested SyncBarrier + * @return A RefPtr to a SyncBarrier which is shared by all threads with this + * SyncBarrier. + * @throw SyncPointInvalidComponentException thrown if component name is invalid + * @throw SyncPointAlreadyOpenedException thrown if SyncPoint is already opened + * by the component + */ +RefPtr<SyncBarrier> +SyncPointManager::get_syncbarrier(const std::string & component, const std::string & identifier) +{ + MutexLocker ml(mutex_); + if (component == "") { + throw SyncPointInvalidComponentException(component.c_str(), identifier.c_str()); + } + // insert a new SyncBarrier if no SyncBarrier with the same identifier exists, + // otherwise, use that SyncBarrier + std::pair<std::set<RefPtr<SyncBarrier> >::iterator, bool> ret = + syncbarriers_.insert(RefPtr<SyncBarrier>(new SyncBarrier(identifier))); + + std::set<RefPtr<SyncBarrier> >::iterator it = ret.first; + + // add component to the set of watchers + // check if component is already a watcher + // insert returns a pair whose second element is false if element already exists + if (!(*it)->add_watcher(component).second) { + throw SyncPointAlreadyOpenedException(component.c_str(), identifier.c_str()); + } + + return *it; +} + +/** * Release a SyncPoint. After releasing the SyncPoint, its wait() and emit() * methods cannot be called anymore by the releasing component * @param component The releasing component @@ -117,6 +152,29 @@ SyncPointManager::release_syncpoint(const std::string & component, RefPtr<SyncPo } } +/** + * Release a SyncBarrier. After releasing the SyncBarrier, its wait() and emit() + * methods cannot be called anymore by the releasing component + * @param component The releasing component + * @param sync_barrier A RefPtr to the released SyncBarrier + * @throw SyncPointReleasedDoesNotExistException thrown if the SyncBarrier doesn't + * exist, i.e. is not in the list of the manager's SyncBarriers. + * @throw SyncPointReleasedByNonWatcherException The releasing component is not + * a watcher of the SyncBarrier + */ +void +SyncPointManager::release_syncbarrier(const std::string & component, RefPtr<SyncBarrier> sync_barrier) +{ + MutexLocker ml(mutex_); + std::set<RefPtr<SyncBarrier> >::iterator sp_it = syncbarriers_.find( + sync_barrier); + if (sp_it == syncbarriers_.end()) { + throw SyncPointReleasedDoesNotExistException(component.c_str(), sync_barrier->get_identifier().c_str()); + } + if (!(*sp_it)->watchers_.erase(component)) { + throw SyncPointReleasedByNonWatcherException(component.c_str(), sync_barrier->get_identifier().c_str()); + } +} /** @class SyncPointSetLessThan "syncpoint_manager.h" * Compare sets of syncpoints @@ -146,6 +204,31 @@ SyncPointManager::get_syncpoints() { } /** + * Get the current list of all SyncBarriers managed by this SyncPointManager + * @return a set of SyncBarriers + */ +std::set<RefPtr<SyncBarrier>, SyncPointSetLessThan > +SyncPointManager::get_syncbarriers() { + MutexLocker ml(mutex_); + return syncbarriers_; +} + +/** + * Reset all SyncBarriers. Resetting the barrier causes all registered emitters + * to be pending again, every registered emitter has to emit the barrier before + * a waiter unblocks. + */ +void +SyncPointManager::reset_syncbarriers() +{ + MutexLocker ml(mutex_); + for (std::set<RefPtr<SyncBarrier> >::iterator it = syncbarriers_.begin(); + it != syncbarriers_.end(); it++) { + (*it)->reset_emitters(); + } +} + +/** * Get DOT graph for all SyncPoints * @param max_age Show only SyncPoint calls which are younger than max_age * @return string representation of DOT graph diff --git a/src/libs/syncpoint/syncpoint_manager.h b/src/libs/syncpoint/syncpoint_manager.h index d670237..acf2fdb 100644 --- a/src/libs/syncpoint/syncpoint_manager.h +++ b/src/libs/syncpoint/syncpoint_manager.h @@ -26,6 +26,7 @@ #include <string> #include <syncpoint/syncpoint.h> +#include <syncpoint/syncbarrier.h> #include <core/utils/refptr.h> #include <core/threading/mutex.h> @@ -50,15 +51,22 @@ class SyncPointManager RefPtr<SyncPoint> get_syncpoint(const std::string & component, const std::string & identifier); void release_syncpoint(const std::string & component, RefPtr<SyncPoint> syncpoint); + RefPtr<SyncBarrier> get_syncbarrier(const std::string & component, const std::string & identifier); + void release_syncbarrier(const std::string & component, RefPtr<SyncBarrier> syncpoint); + std::set<RefPtr<SyncPoint>, SyncPointSetLessThan > get_syncpoints(); + std::set<RefPtr<SyncBarrier>, SyncPointSetLessThan > get_syncbarriers(); + + void reset_syncbarriers(); + std::string all_syncpoints_as_dot(float max_age); protected: - /** - * Set of all existing SyncPoints - */ + /** Set of all existing SyncPoints */ std::set<RefPtr<SyncPoint>, SyncPointSetLessThan > syncpoints_; + /** Set of all existing SyncBarriers */ + std::set<RefPtr<SyncBarrier>, SyncPointSetLessThan > syncbarriers_; /** Mutex used for all SyncPointManager calls */ Mutex *mutex_; - *commit* b8a77f3cd32f2d804362ee7eecd7d43506c145f8 - - - - - - - - - - Author: Till Hofmann <hofm...@kbsg.rwth-aachen.de> Date: Mon Jan 26 16:50:14 2015 +0100 Subject: syncpoint: add SyncBarrier tests src/libs/syncpoint/tests/test_syncpoint.cpp | 260 +++++++++++++++++++++++++++ 1 files changed, 260 insertions(+), 0 deletions(-) _Diff for modified files_: diff --git a/src/libs/syncpoint/tests/test_syncpoint.cpp b/src/libs/syncpoint/tests/test_syncpoint.cpp index 44a6106..0b7a740 100644 --- a/src/libs/syncpoint/tests/test_syncpoint.cpp +++ b/src/libs/syncpoint/tests/test_syncpoint.cpp @@ -103,6 +103,18 @@ class SyncPointManagerTest : public ::testing::Test pthread_attr_t attrs; }; +/** @class SyncBarrierTest + * Test SyncBarriers + */ +class SyncBarrierTest : public SyncPointManagerTest +{ +protected: + /** Constructor. */ + SyncBarrierTest() + { + } +}; + TEST_F(SyncPointTest, CreateSyncPoint) { ASSERT_TRUE(*sp1 != NULL); @@ -367,3 +379,251 @@ TEST_F(SyncPointManagerTest, WaitDoesNotReturnImmediately) delete params[i]; } } + +/** get a SyncBarrier and wait for it */ +void * start_barrier_waiter_thread(void * data) { + waiter_thread_params *params = (waiter_thread_params *)data; + char *comp; + asprintf(&comp, "component %u", params->thread_nr); + string component = comp; + free(comp); + RefPtr<SyncBarrier> sp; + EXPECT_NO_THROW(sp = params->manager->get_syncbarrier(component, params->sp_identifier)); + for (uint i = 0; i < params->num_wait_calls; i++) { + sp->wait(component); + } + pthread_exit(NULL); +} + +/** get a SyncBarrier, register as emitter and emit */ +void * start_barrier_emitter_thread(void * data) { + waiter_thread_params *params = (waiter_thread_params *)data; + char *comp; + asprintf(&comp, "emitter %u", params->thread_nr); + string component = comp; + free(comp); + RefPtr<SyncBarrier> sp; + EXPECT_NO_THROW(sp = params->manager->get_syncbarrier(component, params->sp_identifier)); + sp->register_emitter(component); + for (uint i = 0; i < params->num_wait_calls; i++) { + sp->emit(component); + } + pthread_exit(NULL); +} + +/** Helper class which registers and emits a given SyncBarrier */ +class Emitter { +public: + /** Constructor. + * @param identifier The identifier of this emitter. + * @param syncbarrier The identifier of the SyncBarrier to register for. + * @param manager Pointer to the SyncPointManager to use. + */ + Emitter(string identifier, string syncbarrier, RefPtr<SyncPointManager> manager) + : identifier_(identifier), + manager_(manager) + { + barrier_ = manager->get_syncbarrier(identifier_, syncbarrier); + barrier_->register_emitter(identifier_); + } + + /** Destructor. */ + virtual ~Emitter() + { + barrier_->unregister_emitter(identifier_); + manager_->release_syncbarrier(identifier_, barrier_); + } + + /** emit the SyncBarrier */ + void emit() + { + barrier_->emit(identifier_); + } + +private: + string identifier_; + RefPtr<SyncBarrier> barrier_; + RefPtr<SyncPointManager> manager_; +}; + + +/** Barrier: wait() returns immediately if no emitter is registered */ +TEST_F(SyncBarrierTest,WaitWithNoRegisteredEmitter) +{ + string barrier_id = "/test/barrier"; + RefPtr<SyncBarrier> barrier = manager->get_syncbarrier("main loop", barrier_id); + barrier->reset_emitters(); + const uint num_waiter_threads = 1; + const uint num_wait_calls = 1; + pthread_t waiter_threads[num_waiter_threads]; + waiter_thread_params *params[num_waiter_threads]; + for (uint i = 0; i < num_waiter_threads; i++) { + params[i] = new waiter_thread_params(); + params[i]->manager = manager; + params[i]->thread_nr = i; + params[i]->num_wait_calls = num_wait_calls; + params[i]->sp_identifier = barrier_id; + pthread_create(&waiter_threads[i], &attrs, start_barrier_waiter_thread, params[i]); + usleep(10000); + } + for (uint i = 0; i < num_waiter_threads; i++) { + ASSERT_EQ(0, pthread_tryjoin_np(waiter_threads[i], NULL)); + delete params[i]; + } +} + +/** Start multiple threads, let them wait for a SyncBarrier, + * also have two threads registered as emitter. + * Let the first thread emit the barrier, assert the waiters did not unblock, + * then let the second thread emit. + * This tests the fundamental difference to a SyncPoint: With a SyncPoint, + * wait() returns if the SyncPoint is emitted by one component. + * With a SyncBarrier, all registered emitters need to emit the SyncBarrier + * before wait() returns. + */ +TEST_F(SyncBarrierTest, WaitForAllEmitters) +{ + + string barrier_id = "/test/barrier"; + Emitter em1("emitter 1", barrier_id, manager); + Emitter em2("emitter 2", barrier_id, manager); + + RefPtr<SyncBarrier> barrier = manager->get_syncbarrier("main loop", barrier_id); + barrier->reset_emitters(); + + const uint num_waiter_threads = 50; + const uint num_wait_calls = 1; + pthread_t waiter_threads[num_waiter_threads]; + waiter_thread_params *params[num_waiter_threads]; + for (uint i = 0; i < num_waiter_threads; i++) { + params[i] = new waiter_thread_params(); + params[i]->manager = manager; + params[i]->thread_nr = i; + params[i]->num_wait_calls = num_wait_calls; + params[i]->sp_identifier = barrier_id; + pthread_create(&waiter_threads[i], &attrs, start_barrier_waiter_thread, params[i]); + usleep(10000); + } + + sleep(1); + for (uint i = 0; i < num_waiter_threads; i++) { + EXPECT_EQ(EBUSY, pthread_tryjoin_np(waiter_threads[i], NULL)); + } + + em1.emit(); + + sleep(1); + for (uint i = 0; i < num_waiter_threads; i++) { + EXPECT_EQ(EBUSY, pthread_tryjoin_np(waiter_threads[i], NULL)); + } + + em1.emit(); + + sleep(1); + for (uint i = 0; i < num_waiter_threads; i++) { + EXPECT_EQ(EBUSY, pthread_tryjoin_np(waiter_threads[i], NULL)); + } + + em2.emit(); + + sleep(1); + for (uint i = 0; i < num_waiter_threads; i++) { + ASSERT_EQ(0, pthread_tryjoin_np(waiter_threads[i], NULL)); + delete params[i]; + } +} + + +/** two barriers, emit the first one. Only the threads waiting on the first + * barrier should unblock + */ +TEST_F(SyncBarrierTest, BarriersAreIndependent) +{ + string barrier1_id = "/test/barrier1"; + string barrier2_id = "/test/barrier2"; + Emitter em1("em1", barrier1_id, manager); + Emitter em2("em2", barrier2_id, manager); + + RefPtr<SyncBarrier> barrier1 = manager->get_syncbarrier("main loop", + barrier1_id); + barrier1->reset_emitters(); + + RefPtr<SyncBarrier> barrier2 = manager->get_syncbarrier("main loop", + barrier2_id); + barrier2->reset_emitters(); + + const uint num_waiter_threads = 50; + const uint num_wait_calls = 1; + pthread_t waiter_threads1[num_waiter_threads]; + waiter_thread_params *params1[num_waiter_threads]; + for (uint i = 0; i < num_waiter_threads; i++) { + params1[i] = new waiter_thread_params(); + params1[i]->manager = manager; + params1[i]->thread_nr = i; + params1[i]->num_wait_calls = num_wait_calls; + params1[i]->sp_identifier = barrier1_id; + pthread_create(&waiter_threads1[i], &attrs, start_barrier_waiter_thread, + params1[i]); + usleep(10000); + } + + pthread_t waiter_threads2[num_waiter_threads]; + waiter_thread_params *params2[num_waiter_threads]; + for (uint i = 0; i < num_waiter_threads; i++) { + params2[i] = new waiter_thread_params(); + params2[i]->manager = manager; + params2[i]->thread_nr = i; + params2[i]->num_wait_calls = num_wait_calls; + params2[i]->sp_identifier = barrier2_id; + pthread_create(&waiter_threads2[i], &attrs, start_barrier_waiter_thread, + params2[i]); + usleep(10000); + } + + sleep(1); + for (uint i = 0; i < num_waiter_threads; i++) { + EXPECT_EQ(EBUSY, pthread_tryjoin_np(waiter_threads1[i], NULL)); + } + + for (uint i = 0; i < num_waiter_threads; i++) { + EXPECT_EQ(EBUSY, pthread_tryjoin_np(waiter_threads2[i], NULL)); + } + + em1.emit(); + + sleep(1); + for (uint i = 0; i < num_waiter_threads; i++) { + ASSERT_EQ(0, pthread_tryjoin_np(waiter_threads1[i], NULL)); + delete params1[i]; + } + + for (uint i = 0; i < num_waiter_threads; i++) { + EXPECT_EQ(EBUSY, pthread_tryjoin_np(waiter_threads2[i], NULL)); + } + + em2.emit(); + + sleep(1); + for (uint i = 0; i < num_waiter_threads; i++) { + ASSERT_EQ(0, pthread_tryjoin_np(waiter_threads2[i], NULL)); + delete params2[i]; + } +} + +/** Emit a barrier without registering */ +TEST_F(SyncBarrierTest, EmitWithoutRegister) +{ + string component = "emitter"; + RefPtr<SyncBarrier> barrier = manager->get_syncbarrier(component, "/test/barrier"); + ASSERT_THROW(barrier->emit(component), SyncBarrierNonEmitterCalledEmitException); +} + +/** Register multiple times */ +TEST_F(SyncBarrierTest, MultipleRegisterCalls) +{ + string component = "emitter"; + RefPtr<SyncBarrier> barrier = manager->get_syncbarrier(component, "/test/barrier"); + EXPECT_NO_THROW(barrier->register_emitter(component)); + ASSERT_THROW(barrier->register_emitter(component), SyncBarrierMultipleRegisterCallsException); +} + -- Fawkes Robotics Framework http://www.fawkesrobotics.org _______________________________________________ fawkes-commits mailing list fawkes-commits@lists.kbsg.rwth-aachen.de https://lists.kbsg.rwth-aachen.de/listinfo/fawkes-commits