Replaced `std::shared_ptr` with `std::unique_ptr` in `dispatch`. Since `dispatch` can now handle move-only parameters, `Promise` and function object can be wrapped into `std::unique_ptr` for efficiency.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bca8c6a0 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bca8c6a0 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bca8c6a0 Branch: refs/heads/master Commit: bca8c6a05d03a2162c04703a9c1ac8172fdfae8a Parents: 7b0812e Author: Dmitry Zhuk <dz...@twopensource.com> Authored: Tue Dec 5 13:45:56 2017 -0800 Committer: Michael Park <mp...@apache.org> Committed: Tue Dec 5 14:08:36 2017 -0800 ---------------------------------------------------------------------- .../libprocess/include/process/dispatch.hpp | 133 +++++++++++-------- 3rdparty/libprocess/include/process/event.hpp | 6 +- 3rdparty/libprocess/src/process.cpp | 6 +- 3 files changed, 83 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/bca8c6a0/3rdparty/libprocess/include/process/dispatch.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/dispatch.hpp b/3rdparty/libprocess/include/process/dispatch.hpp index 29b0082..82432a7 100644 --- a/3rdparty/libprocess/include/process/dispatch.hpp +++ b/3rdparty/libprocess/include/process/dispatch.hpp @@ -14,7 +14,7 @@ #define __PROCESS_DISPATCH_HPP__ #include <functional> -#include <memory> // TODO(benh): Replace shared_ptr with unique_ptr. +#include <memory> #include <string> #include <process/process.hpp> @@ -60,11 +60,10 @@ namespace internal { // this routine does not expect anything in particular about the // specified function (second argument). The semantics are simple: the // function gets applied/invoked with the process as its first -// argument. Currently we wrap the function in a shared_ptr but this -// will probably change in the future to unique_ptr (or a variant). +// argument. void dispatch( const UPID& pid, - const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& f, + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f, const Option<const std::type_info*>& functionType = None()); @@ -84,7 +83,7 @@ struct Dispatch<void> template <typename F> void operator()(const UPID& pid, F&& f) { - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( new lambda::CallableOnce<void(ProcessBase*)>( lambda::partial( [](typename std::decay<F>::type&& f, ProcessBase*) { @@ -93,7 +92,7 @@ struct Dispatch<void> std::forward<F>(f), lambda::_1))); - internal::dispatch(pid, f_); + internal::dispatch(pid, std::move(f_)); } }; @@ -108,20 +107,24 @@ struct Dispatch<Future<R>> template <typename F> Future<R> operator()(const UPID& pid, F&& f) { - std::shared_ptr<Promise<R>> promise(new Promise<R>()); + std::unique_ptr<Promise<R>> promise(new Promise<R>()); + Future<R> future = promise->future(); - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( new lambda::CallableOnce<void(ProcessBase*)>( lambda::partial( - [=](typename std::decay<F>::type&& f, ProcessBase*) { + [](std::unique_ptr<Promise<R>> promise, + typename std::decay<F>::type&& f, + ProcessBase*) { promise->associate(std::move(f)()); }, + std::move(promise), std::forward<F>(f), lambda::_1))); - internal::dispatch(pid, f_); + internal::dispatch(pid, std::move(f_)); - return promise->future(); + return future; } }; @@ -135,20 +138,24 @@ struct Dispatch template <typename F> Future<R> operator()(const UPID& pid, F&& f) { - std::shared_ptr<Promise<R>> promise(new Promise<R>()); + std::unique_ptr<Promise<R>> promise(new Promise<R>()); + Future<R> future = promise->future(); - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( new lambda::CallableOnce<void(ProcessBase*)>( lambda::partial( - [=](typename std::decay<F>::type&& f, ProcessBase*) { + [](std::unique_ptr<Promise<R>> promise, + typename std::decay<F>::type&& f, + ProcessBase*) { promise->set(std::move(f)()); }, + std::move(promise), std::forward<F>(f), lambda::_1))); - internal::dispatch(pid, f_); + internal::dispatch(pid, std::move(f_)); - return promise->future(); + return future; } }; @@ -166,7 +173,7 @@ struct Dispatch template <typename T> void dispatch(const PID<T>& pid, void (T::*method)()) { - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( new lambda::CallableOnce<void(ProcessBase*)>( [=](ProcessBase* process) { assert(process != nullptr); @@ -175,7 +182,7 @@ void dispatch(const PID<T>& pid, void (T::*method)()) (t->*method)(); })); - internal::dispatch(pid, f, &typeid(method)); + internal::dispatch(pid, std::move(f), &typeid(method)); } template <typename T> @@ -208,7 +215,7 @@ void dispatch(const Process<T>* process, void (T::*method)()) void (T::*method)(ENUM_PARAMS(N, P)), \ ENUM_BINARY_PARAMS(N, A, &&a)) \ { \ - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ new lambda::CallableOnce<void(ProcessBase*)>( \ lambda::partial( \ [method](ENUM(N, DECL, _), ProcessBase* process) { \ @@ -220,7 +227,7 @@ void dispatch(const Process<T>* process, void (T::*method)()) ENUM(N, FORWARD, _), \ lambda::_1))); \ \ - internal::dispatch(pid, f, &typeid(method)); \ + internal::dispatch(pid, std::move(f), &typeid(method)); \ } \ \ template <typename T, \ @@ -254,20 +261,24 @@ void dispatch(const Process<T>* process, void (T::*method)()) template <typename R, typename T> Future<R> dispatch(const PID<T>& pid, Future<R> (T::*method)()) { - std::shared_ptr<Promise<R>> promise(new Promise<R>()); + std::unique_ptr<Promise<R>> promise(new Promise<R>()); + Future<R> future = promise->future(); - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( new lambda::CallableOnce<void(ProcessBase*)>( - [=](ProcessBase* process) { - assert(process != nullptr); - T* t = dynamic_cast<T*>(process); - assert(t != nullptr); - promise->associate((t->*method)()); - })); - - internal::dispatch(pid, f, &typeid(method)); - - return promise->future(); + lambda::partial( + [=](std::unique_ptr<Promise<R>> promise, ProcessBase* process) { + assert(process != nullptr); + T* t = dynamic_cast<T*>(process); + assert(t != nullptr); + promise->associate((t->*method)()); + }, + std::move(promise), + lambda::_1))); + + internal::dispatch(pid, std::move(f), &typeid(method)); + + return future; } template <typename R, typename T> @@ -292,25 +303,28 @@ Future<R> dispatch(const Process<T>* process, Future<R> (T::*method)()) Future<R> (T::*method)(ENUM_PARAMS(N, P)), \ ENUM_BINARY_PARAMS(N, A, &&a)) \ { \ - std::shared_ptr<Promise<R>> promise(new Promise<R>()); \ + std::unique_ptr<Promise<R>> promise(new Promise<R>()); \ + Future<R> future = promise->future(); \ \ - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ new lambda::CallableOnce<void(ProcessBase*)>( \ lambda::partial( \ - [promise, method](ENUM(N, DECL, _), \ - ProcessBase* process) { \ + [method](std::unique_ptr<Promise<R>> promise, \ + ENUM(N, DECL, _), \ + ProcessBase* process) { \ assert(process != nullptr); \ T* t = dynamic_cast<T*>(process); \ assert(t != nullptr); \ promise->associate( \ (t->*method)(ENUM(N, MOVE, _))); \ }, \ + std::move(promise), \ ENUM(N, FORWARD, _), \ lambda::_1))); \ \ - internal::dispatch(pid, f, &typeid(method)); \ + internal::dispatch(pid, std::move(f), &typeid(method)); \ \ - return promise->future(); \ + return future; \ } \ \ template <typename R, \ @@ -346,20 +360,24 @@ Future<R> dispatch(const Process<T>* process, Future<R> (T::*method)()) template <typename R, typename T> Future<R> dispatch(const PID<T>& pid, R (T::*method)()) { - std::shared_ptr<Promise<R>> promise(new Promise<R>()); + std::unique_ptr<Promise<R>> promise(new Promise<R>()); + Future<R> future = promise->future(); - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( new lambda::CallableOnce<void(ProcessBase*)>( - [=](ProcessBase* process) { - assert(process != nullptr); - T* t = dynamic_cast<T*>(process); - assert(t != nullptr); - promise->set((t->*method)()); - })); - - internal::dispatch(pid, f, &typeid(method)); - - return promise->future(); + lambda::partial( + [=](std::unique_ptr<Promise<R>> promise, ProcessBase* process) { + assert(process != nullptr); + T* t = dynamic_cast<T*>(process); + assert(t != nullptr); + promise->set((t->*method)()); + }, + std::move(promise), + lambda::_1))); + + internal::dispatch(pid, std::move(f), &typeid(method)); + + return future; } template <typename R, typename T> @@ -384,24 +402,27 @@ Future<R> dispatch(const Process<T>* process, R (T::*method)()) R (T::*method)(ENUM_PARAMS(N, P)), \ ENUM_BINARY_PARAMS(N, A, &&a)) \ { \ - std::shared_ptr<Promise<R>> promise(new Promise<R>()); \ + std::unique_ptr<Promise<R>> promise(new Promise<R>()); \ + Future<R> future = promise->future(); \ \ - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ new lambda::CallableOnce<void(ProcessBase*)>( \ lambda::partial( \ - [promise, method](ENUM(N, DECL, _), \ - ProcessBase* process) { \ + [method](std::unique_ptr<Promise<R>> promise, \ + ENUM(N, DECL, _), \ + ProcessBase* process) { \ assert(process != nullptr); \ T* t = dynamic_cast<T*>(process); \ assert(t != nullptr); \ promise->set((t->*method)(ENUM(N, MOVE, _))); \ }, \ + std::move(promise), \ ENUM(N, FORWARD, _), \ lambda::_1))); \ \ - internal::dispatch(pid, f, &typeid(method)); \ + internal::dispatch(pid, std::move(f), &typeid(method)); \ \ - return promise->future(); \ + return future; \ } \ \ template <typename R, \ http://git-wip-us.apache.org/repos/asf/mesos/blob/bca8c6a0/3rdparty/libprocess/include/process/event.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp index 2e93428..ec64eb7 100644 --- a/3rdparty/libprocess/include/process/event.hpp +++ b/3rdparty/libprocess/include/process/event.hpp @@ -179,10 +179,10 @@ struct DispatchEvent : Event { DispatchEvent( const UPID& _pid, - const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& _f, + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> _f, const Option<const std::type_info*>& _functionType) : pid(_pid), - f(_f), + f(std::move(_f)), functionType(_functionType) {} @@ -205,7 +205,7 @@ struct DispatchEvent : Event UPID pid; // Function to get invoked as a result of this dispatch event. - std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f; + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f; Option<const std::type_info*> functionType; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/bca8c6a0/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index f62df49..75cf1d3 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -2977,7 +2977,7 @@ void ProcessManager::cleanup(ProcessBase* process) // Remove help strings for all installed routes for this process. dispatch(help, &Help::remove, process->pid.id); - // Possible gate non-libprocess threads are waiting at. + // Possible gate non-libprocess threads are waiting at. std::shared_ptr<Gate> gate = process->gate; // Remove process. @@ -3918,12 +3918,12 @@ namespace internal { void dispatch( const UPID& pid, - const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& f, + std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f, const Option<const std::type_info*>& functionType) { process::initialize(); - DispatchEvent* event = new DispatchEvent(pid, f, functionType); + DispatchEvent* event = new DispatchEvent(pid, std::move(f), functionType); process_manager->deliver(pid, event, __process__); }