Repository: mesos Updated Branches: refs/heads/master c9462f492 -> 62b472731
Changed dispatch to use callable once functors. `dispatch` guarantees that functor will be called at most once, and therefore it allows optimizations, such as moves of deferred objects. Review: https://reviews.apache.org/r/63634/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d9ce98e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d9ce98e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d9ce98e Branch: refs/heads/master Commit: 0d9ce98e9df97be06144d2e29cf23a9c090a06b3 Parents: c9462f4 Author: Dmitry Zhuk <dz...@twopensource.com> Authored: Tue Dec 5 10:39:47 2017 -0800 Committer: Michael Park <mp...@apache.org> Committed: Tue Dec 5 10:56:14 2017 -0800 ---------------------------------------------------------------------- .../libprocess/include/process/dispatch.hpp | 127 ++++++++++--------- 3rdparty/libprocess/include/process/event.hpp | 4 +- 3rdparty/libprocess/src/process.cpp | 2 +- 3rdparty/libprocess/src/tests/process_tests.cpp | 18 +++ 4 files changed, 91 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9ce98e/3rdparty/libprocess/include/process/dispatch.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/dispatch.hpp b/3rdparty/libprocess/include/process/dispatch.hpp index 155f362..29b0082 100644 --- a/3rdparty/libprocess/include/process/dispatch.hpp +++ b/3rdparty/libprocess/include/process/dispatch.hpp @@ -64,7 +64,7 @@ namespace internal { // will probably change in the future to unique_ptr (or a variant). void dispatch( const UPID& pid, - const std::shared_ptr<std::function<void(ProcessBase*)>>& f, + const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& f, const Option<const std::type_info*>& functionType = None()); @@ -84,11 +84,14 @@ struct Dispatch<void> template <typename F> void operator()(const UPID& pid, F&& f) { - std::shared_ptr<std::function<void(ProcessBase*)>> f_( - new std::function<void(ProcessBase*)>( - [=](ProcessBase*) { - f(); - })); + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( + new lambda::CallableOnce<void(ProcessBase*)>( + lambda::partial( + [](typename std::decay<F>::type&& f, ProcessBase*) { + std::move(f)(); + }, + std::forward<F>(f), + lambda::_1))); internal::dispatch(pid, f_); } @@ -107,11 +110,14 @@ struct Dispatch<Future<R>> { std::shared_ptr<Promise<R>> promise(new Promise<R>()); - std::shared_ptr<std::function<void(ProcessBase*)>> f_( - new std::function<void(ProcessBase*)>( - [=](ProcessBase*) { - promise->associate(f()); - })); + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( + new lambda::CallableOnce<void(ProcessBase*)>( + lambda::partial( + [=](typename std::decay<F>::type&& f, ProcessBase*) { + promise->associate(std::move(f)()); + }, + std::forward<F>(f), + lambda::_1))); internal::dispatch(pid, f_); @@ -131,11 +137,14 @@ struct Dispatch { std::shared_ptr<Promise<R>> promise(new Promise<R>()); - std::shared_ptr<std::function<void(ProcessBase*)>> f_( - new std::function<void(ProcessBase*)>( - [=](ProcessBase*) { - promise->set(f()); - })); + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_( + new lambda::CallableOnce<void(ProcessBase*)>( + lambda::partial( + [=](typename std::decay<F>::type&& f, ProcessBase*) { + promise->set(std::move(f)()); + }, + std::forward<F>(f), + lambda::_1))); internal::dispatch(pid, f_); @@ -157,8 +166,8 @@ struct Dispatch template <typename T> void dispatch(const PID<T>& pid, void (T::*method)()) { - std::shared_ptr<std::function<void(ProcessBase*)>> f( - new std::function<void(ProcessBase*)>( + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( + new lambda::CallableOnce<void(ProcessBase*)>( [=](ProcessBase* process) { assert(process != nullptr); T* t = dynamic_cast<T*>(process); @@ -187,7 +196,8 @@ void dispatch(const Process<T>* process, void (T::*method)()) // The following assumes base names for type and variable are `A` and `a`. #define FORWARD(Z, N, DATA) std::forward<A ## N>(a ## N) -#define DECL(Z, N, DATA) typename std::decay<A ## N>::type& a ## N +#define MOVE(Z, N, DATA) std::move(a ## N) +#define DECL(Z, N, DATA) typename std::decay<A ## N>::type&& a ## N #define TEMPLATE(Z, N, DATA) \ template <typename T, \ @@ -198,17 +208,17 @@ void dispatch(const Process<T>* process, void (T::*method)()) void (T::*method)(ENUM_PARAMS(N, P)), \ ENUM_BINARY_PARAMS(N, A, &&a)) \ { \ - std::shared_ptr<std::function<void(ProcessBase*)>> f( \ - new std::function<void(ProcessBase*)>( \ - std::bind([method](ENUM(N, DECL, _), \ - ProcessBase* process) { \ - assert(process != nullptr); \ - T* t = dynamic_cast<T*>(process); \ - assert(t != nullptr); \ - (t->*method)(ENUM_PARAMS(N, a)); \ - }, \ - ENUM(N, FORWARD, _), \ - lambda::_1))); \ + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ + new lambda::CallableOnce<void(ProcessBase*)>( \ + lambda::partial( \ + [method](ENUM(N, DECL, _), ProcessBase* process) { \ + assert(process != nullptr); \ + T* t = dynamic_cast<T*>(process); \ + assert(t != nullptr); \ + (t->*method)(ENUM(N, MOVE, _)); \ + }, \ + ENUM(N, FORWARD, _), \ + lambda::_1))); \ \ internal::dispatch(pid, f, &typeid(method)); \ } \ @@ -246,8 +256,8 @@ Future<R> dispatch(const PID<T>& pid, Future<R> (T::*method)()) { std::shared_ptr<Promise<R>> promise(new Promise<R>()); - std::shared_ptr<std::function<void(ProcessBase*)>> f( - new std::function<void(ProcessBase*)>( + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( + new lambda::CallableOnce<void(ProcessBase*)>( [=](ProcessBase* process) { assert(process != nullptr); T* t = dynamic_cast<T*>(process); @@ -284,18 +294,19 @@ Future<R> dispatch(const Process<T>* process, Future<R> (T::*method)()) { \ std::shared_ptr<Promise<R>> promise(new Promise<R>()); \ \ - std::shared_ptr<std::function<void(ProcessBase*)>> f( \ - new std::function<void(ProcessBase*)>( \ - std::bind([promise, method](ENUM(N, DECL, _), \ - ProcessBase* process) { \ - assert(process != nullptr); \ - T* t = dynamic_cast<T*>(process); \ - assert(t != nullptr); \ - promise->associate( \ - (t->*method)(ENUM_PARAMS(N, a))); \ - }, \ - ENUM(N, FORWARD, _), \ - lambda::_1))); \ + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ + new lambda::CallableOnce<void(ProcessBase*)>( \ + lambda::partial( \ + [promise, method](ENUM(N, DECL, _), \ + ProcessBase* process) { \ + assert(process != nullptr); \ + T* t = dynamic_cast<T*>(process); \ + assert(t != nullptr); \ + promise->associate( \ + (t->*method)(ENUM(N, MOVE, _))); \ + }, \ + ENUM(N, FORWARD, _), \ + lambda::_1))); \ \ internal::dispatch(pid, f, &typeid(method)); \ \ @@ -337,8 +348,8 @@ Future<R> dispatch(const PID<T>& pid, R (T::*method)()) { std::shared_ptr<Promise<R>> promise(new Promise<R>()); - std::shared_ptr<std::function<void(ProcessBase*)>> f( - new std::function<void(ProcessBase*)>( + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( + new lambda::CallableOnce<void(ProcessBase*)>( [=](ProcessBase* process) { assert(process != nullptr); T* t = dynamic_cast<T*>(process); @@ -375,17 +386,18 @@ Future<R> dispatch(const Process<T>* process, R (T::*method)()) { \ std::shared_ptr<Promise<R>> promise(new Promise<R>()); \ \ - std::shared_ptr<std::function<void(ProcessBase*)>> f( \ - new std::function<void(ProcessBase*)>( \ - std::bind([promise, method](ENUM(N, DECL, _), \ - ProcessBase* process) { \ - assert(process != nullptr); \ - T* t = dynamic_cast<T*>(process); \ - assert(t != nullptr); \ - promise->set((t->*method)(ENUM_PARAMS(N, a))); \ - }, \ - ENUM(N, FORWARD, _), \ - lambda::_1))); \ + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \ + new lambda::CallableOnce<void(ProcessBase*)>( \ + lambda::partial( \ + [promise, method](ENUM(N, DECL, _), \ + ProcessBase* process) { \ + assert(process != nullptr); \ + T* t = dynamic_cast<T*>(process); \ + assert(t != nullptr); \ + promise->set((t->*method)(ENUM(N, MOVE, _))); \ + }, \ + ENUM(N, FORWARD, _), \ + lambda::_1))); \ \ internal::dispatch(pid, f, &typeid(method)); \ \ @@ -420,6 +432,7 @@ Future<R> dispatch(const Process<T>* process, R (T::*method)()) #undef TEMPLATE #undef DECL +#undef MOVE #undef FORWARD // We use partial specialization of http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9ce98e/3rdparty/libprocess/include/process/event.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp index 4b785e3..76bcdb8 100644 --- a/3rdparty/libprocess/include/process/event.hpp +++ b/3rdparty/libprocess/include/process/event.hpp @@ -180,7 +180,7 @@ struct DispatchEvent : Event { DispatchEvent( const UPID& _pid, - const std::shared_ptr<lambda::function<void(ProcessBase*)>>& _f, + const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& _f, const Option<const std::type_info*>& _functionType) : pid(_pid), f(_f), @@ -206,7 +206,7 @@ struct DispatchEvent : Event UPID pid; // Function to get invoked as a result of this dispatch event. - std::shared_ptr<lambda::function<void(ProcessBase*)>> f; + std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f; Option<const std::type_info*> functionType; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9ce98e/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 2b17e25..f62df49 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -3918,7 +3918,7 @@ namespace internal { void dispatch( const UPID& pid, - const std::shared_ptr<lambda::function<void(ProcessBase*)>>& f, + const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& f, const Option<const std::type_info*>& functionType) { process::initialize(); http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9ce98e/3rdparty/libprocess/src/tests/process_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp index 45ddd17..4a3e3ca 100644 --- a/3rdparty/libprocess/src/tests/process_tests.cpp +++ b/3rdparty/libprocess/src/tests/process_tests.cpp @@ -146,6 +146,18 @@ TEST(ProcessTest, THREADSAFE_Spawn) } +struct MoveOnly +{ + MoveOnly() {} + + MoveOnly(const MoveOnly&) = delete; + MoveOnly(MoveOnly&&) = default; + + MoveOnly& operator=(const MoveOnly&) = delete; + MoveOnly& operator=(MoveOnly&&) = default; +}; + + class DispatchProcess : public Process<DispatchProcess> { public: @@ -154,6 +166,9 @@ public: MOCK_METHOD1(func2, Future<bool>(bool)); MOCK_METHOD1(func3, int(int)); MOCK_METHOD2(func4, Future<bool>(bool, int)); + + void func5(MoveOnly&& mo) { func5_(mo); } + MOCK_METHOD1(func5_, void(const MoveOnly&)); }; @@ -169,11 +184,14 @@ TEST(ProcessTest, THREADSAFE_Dispatch) EXPECT_CALL(process, func2(_)) .WillOnce(ReturnArg<0>()); + EXPECT_CALL(process, func5_(_)); + PID<DispatchProcess> pid = spawn(&process); ASSERT_FALSE(!pid); dispatch(pid, &DispatchProcess::func0); + dispatch(pid, &DispatchProcess::func5, MoveOnly()); Future<bool> future;