Repository: mesos
Updated Branches:
  refs/heads/master c9462f492 -> 62b472731


Changed dispatch to use callable once functors.

`dispatch` guarantees that functor will be called at most once, and
therefore it allows optimizations, such as moves of deferred objects.

Review: https://reviews.apache.org/r/63634/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d9ce98e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d9ce98e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d9ce98e

Branch: refs/heads/master
Commit: 0d9ce98e9df97be06144d2e29cf23a9c090a06b3
Parents: c9462f4
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 10:39:47 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 10:56:14 2017 -0800

----------------------------------------------------------------------
 .../libprocess/include/process/dispatch.hpp     | 127 ++++++++++---------
 3rdparty/libprocess/include/process/event.hpp   |   4 +-
 3rdparty/libprocess/src/process.cpp             |   2 +-
 3rdparty/libprocess/src/tests/process_tests.cpp |  18 +++
 4 files changed, 91 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9ce98e/3rdparty/libprocess/include/process/dispatch.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/dispatch.hpp 
b/3rdparty/libprocess/include/process/dispatch.hpp
index 155f362..29b0082 100644
--- a/3rdparty/libprocess/include/process/dispatch.hpp
+++ b/3rdparty/libprocess/include/process/dispatch.hpp
@@ -64,7 +64,7 @@ namespace internal {
 // will probably change in the future to unique_ptr (or a variant).
 void dispatch(
     const UPID& pid,
-    const std::shared_ptr<std::function<void(ProcessBase*)>>& f,
+    const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& f,
     const Option<const std::type_info*>& functionType = None());
 
 
@@ -84,11 +84,14 @@ struct Dispatch<void>
   template <typename F>
   void operator()(const UPID& pid, F&& f)
   {
-    std::shared_ptr<std::function<void(ProcessBase*)>> f_(
-        new std::function<void(ProcessBase*)>(
-            [=](ProcessBase*) {
-              f();
-            }));
+    std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
+        new lambda::CallableOnce<void(ProcessBase*)>(
+            lambda::partial(
+                [](typename std::decay<F>::type&& f, ProcessBase*) {
+                  std::move(f)();
+                },
+                std::forward<F>(f),
+                lambda::_1)));
 
     internal::dispatch(pid, f_);
   }
@@ -107,11 +110,14 @@ struct Dispatch<Future<R>>
   {
     std::shared_ptr<Promise<R>> promise(new Promise<R>());
 
-    std::shared_ptr<std::function<void(ProcessBase*)>> f_(
-        new std::function<void(ProcessBase*)>(
-            [=](ProcessBase*) {
-              promise->associate(f());
-            }));
+    std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
+        new lambda::CallableOnce<void(ProcessBase*)>(
+            lambda::partial(
+                [=](typename std::decay<F>::type&& f, ProcessBase*) {
+                  promise->associate(std::move(f)());
+                },
+                std::forward<F>(f),
+                lambda::_1)));
 
     internal::dispatch(pid, f_);
 
@@ -131,11 +137,14 @@ struct Dispatch
   {
     std::shared_ptr<Promise<R>> promise(new Promise<R>());
 
-    std::shared_ptr<std::function<void(ProcessBase*)>> f_(
-        new std::function<void(ProcessBase*)>(
-            [=](ProcessBase*) {
-              promise->set(f());
-            }));
+    std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
+        new lambda::CallableOnce<void(ProcessBase*)>(
+            lambda::partial(
+                [=](typename std::decay<F>::type&& f, ProcessBase*) {
+                  promise->set(std::move(f)());
+                },
+                std::forward<F>(f),
+                lambda::_1)));
 
     internal::dispatch(pid, f_);
 
@@ -157,8 +166,8 @@ struct Dispatch
 template <typename T>
 void dispatch(const PID<T>& pid, void (T::*method)())
 {
-  std::shared_ptr<std::function<void(ProcessBase*)>> f(
-      new std::function<void(ProcessBase*)>(
+  std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
+      new lambda::CallableOnce<void(ProcessBase*)>(
           [=](ProcessBase* process) {
             assert(process != nullptr);
             T* t = dynamic_cast<T*>(process);
@@ -187,7 +196,8 @@ void dispatch(const Process<T>* process, void 
(T::*method)())
 
 // The following assumes base names for type and variable are `A` and `a`.
 #define FORWARD(Z, N, DATA) std::forward<A ## N>(a ## N)
-#define DECL(Z, N, DATA) typename std::decay<A ## N>::type& a ## N
+#define MOVE(Z, N, DATA) std::move(a ## N)
+#define DECL(Z, N, DATA) typename std::decay<A ## N>::type&& a ## N
 
 #define TEMPLATE(Z, N, DATA)                                            \
   template <typename T,                                                 \
@@ -198,17 +208,17 @@ void dispatch(const Process<T>* process, void 
(T::*method)())
       void (T::*method)(ENUM_PARAMS(N, P)),                             \
       ENUM_BINARY_PARAMS(N, A, &&a))                                    \
   {                                                                     \
-    std::shared_ptr<std::function<void(ProcessBase*)>> f(               \
-        new std::function<void(ProcessBase*)>(                          \
-            std::bind([method](ENUM(N, DECL, _),                        \
-                               ProcessBase* process) {                  \
-                        assert(process != nullptr);                     \
-                        T* t = dynamic_cast<T*>(process);               \
-                        assert(t != nullptr);                           \
-                        (t->*method)(ENUM_PARAMS(N, a));                \
-                      },                                                \
-                      ENUM(N, FORWARD, _),                              \
-                      lambda::_1)));                                    \
+    std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(        \
+        new lambda::CallableOnce<void(ProcessBase*)>(                   \
+            lambda::partial(                                            \
+                [method](ENUM(N, DECL, _), ProcessBase* process) {      \
+                  assert(process != nullptr);                           \
+                  T* t = dynamic_cast<T*>(process);                     \
+                  assert(t != nullptr);                                 \
+                  (t->*method)(ENUM(N, MOVE, _));                       \
+                },                                                      \
+                ENUM(N, FORWARD, _),                                    \
+                lambda::_1)));                                          \
                                                                         \
     internal::dispatch(pid, f, &typeid(method));                        \
   }                                                                     \
@@ -246,8 +256,8 @@ Future<R> dispatch(const PID<T>& pid, Future<R> 
(T::*method)())
 {
   std::shared_ptr<Promise<R>> promise(new Promise<R>());
 
-  std::shared_ptr<std::function<void(ProcessBase*)>> f(
-      new std::function<void(ProcessBase*)>(
+  std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
+      new lambda::CallableOnce<void(ProcessBase*)>(
           [=](ProcessBase* process) {
             assert(process != nullptr);
             T* t = dynamic_cast<T*>(process);
@@ -284,18 +294,19 @@ Future<R> dispatch(const Process<T>* process, Future<R> 
(T::*method)())
   {                                                                     \
     std::shared_ptr<Promise<R>> promise(new Promise<R>());              \
                                                                         \
-    std::shared_ptr<std::function<void(ProcessBase*)>> f(               \
-        new std::function<void(ProcessBase*)>(                          \
-            std::bind([promise, method](ENUM(N, DECL, _),               \
-                                        ProcessBase* process) {         \
-                        assert(process != nullptr);                     \
-                        T* t = dynamic_cast<T*>(process);               \
-                        assert(t != nullptr);                           \
-                        promise->associate(                             \
-                            (t->*method)(ENUM_PARAMS(N, a)));           \
-                      },                                                \
-                      ENUM(N, FORWARD, _),                              \
-                      lambda::_1)));                                    \
+    std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(        \
+        new lambda::CallableOnce<void(ProcessBase*)>(                   \
+            lambda::partial(                                            \
+                [promise, method](ENUM(N, DECL, _),                     \
+                                  ProcessBase* process) {               \
+                  assert(process != nullptr);                           \
+                  T* t = dynamic_cast<T*>(process);                     \
+                  assert(t != nullptr);                                 \
+                  promise->associate(                                   \
+                      (t->*method)(ENUM(N, MOVE, _)));                  \
+                },                                                      \
+                ENUM(N, FORWARD, _),                                    \
+                lambda::_1)));                                          \
                                                                         \
     internal::dispatch(pid, f, &typeid(method));                        \
                                                                         \
@@ -337,8 +348,8 @@ Future<R> dispatch(const PID<T>& pid, R (T::*method)())
 {
   std::shared_ptr<Promise<R>> promise(new Promise<R>());
 
-  std::shared_ptr<std::function<void(ProcessBase*)>> f(
-      new std::function<void(ProcessBase*)>(
+  std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
+      new lambda::CallableOnce<void(ProcessBase*)>(
           [=](ProcessBase* process) {
             assert(process != nullptr);
             T* t = dynamic_cast<T*>(process);
@@ -375,17 +386,18 @@ Future<R> dispatch(const Process<T>* process, R 
(T::*method)())
   {                                                                     \
     std::shared_ptr<Promise<R>> promise(new Promise<R>());              \
                                                                         \
-    std::shared_ptr<std::function<void(ProcessBase*)>> f(               \
-        new std::function<void(ProcessBase*)>(                          \
-            std::bind([promise, method](ENUM(N, DECL, _),               \
-                                        ProcessBase* process) {         \
-                        assert(process != nullptr);                     \
-                        T* t = dynamic_cast<T*>(process);               \
-                        assert(t != nullptr);                           \
-                        promise->set((t->*method)(ENUM_PARAMS(N, a)));  \
-                      },                                                \
-                      ENUM(N, FORWARD, _),                              \
-                      lambda::_1)));                                    \
+    std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(        \
+        new lambda::CallableOnce<void(ProcessBase*)>(                   \
+            lambda::partial(                                            \
+                [promise, method](ENUM(N, DECL, _),                     \
+                                  ProcessBase* process) {               \
+                  assert(process != nullptr);                           \
+                  T* t = dynamic_cast<T*>(process);                     \
+                  assert(t != nullptr);                                 \
+                  promise->set((t->*method)(ENUM(N, MOVE, _)));         \
+                },                                                      \
+                ENUM(N, FORWARD, _),                                    \
+                lambda::_1)));                                          \
                                                                         \
     internal::dispatch(pid, f, &typeid(method));                        \
                                                                         \
@@ -420,6 +432,7 @@ Future<R> dispatch(const Process<T>* process, R 
(T::*method)())
 #undef TEMPLATE
 
 #undef DECL
+#undef MOVE
 #undef FORWARD
 
 // We use partial specialization of

http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9ce98e/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp 
b/3rdparty/libprocess/include/process/event.hpp
index 4b785e3..76bcdb8 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -180,7 +180,7 @@ struct DispatchEvent : Event
 {
   DispatchEvent(
       const UPID& _pid,
-      const std::shared_ptr<lambda::function<void(ProcessBase*)>>& _f,
+      const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& _f,
       const Option<const std::type_info*>& _functionType)
     : pid(_pid),
       f(_f),
@@ -206,7 +206,7 @@ struct DispatchEvent : Event
   UPID pid;
 
   // Function to get invoked as a result of this dispatch event.
-  std::shared_ptr<lambda::function<void(ProcessBase*)>> f;
+  std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>> f;
 
   Option<const std::type_info*> functionType;
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9ce98e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 2b17e25..f62df49 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3918,7 +3918,7 @@ namespace internal {
 
 void dispatch(
     const UPID& pid,
-    const std::shared_ptr<lambda::function<void(ProcessBase*)>>& f,
+    const std::shared_ptr<lambda::CallableOnce<void(ProcessBase*)>>& f,
     const Option<const std::type_info*>& functionType)
 {
   process::initialize();

http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9ce98e/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp 
b/3rdparty/libprocess/src/tests/process_tests.cpp
index 45ddd17..4a3e3ca 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -146,6 +146,18 @@ TEST(ProcessTest, THREADSAFE_Spawn)
 }
 
 
+struct MoveOnly
+{
+  MoveOnly() {}
+
+  MoveOnly(const MoveOnly&) = delete;
+  MoveOnly(MoveOnly&&) = default;
+
+  MoveOnly& operator=(const MoveOnly&) = delete;
+  MoveOnly& operator=(MoveOnly&&) = default;
+};
+
+
 class DispatchProcess : public Process<DispatchProcess>
 {
 public:
@@ -154,6 +166,9 @@ public:
   MOCK_METHOD1(func2, Future<bool>(bool));
   MOCK_METHOD1(func3, int(int));
   MOCK_METHOD2(func4, Future<bool>(bool, int));
+
+  void func5(MoveOnly&& mo) { func5_(mo); }
+  MOCK_METHOD1(func5_, void(const MoveOnly&));
 };
 
 
@@ -169,11 +184,14 @@ TEST(ProcessTest, THREADSAFE_Dispatch)
   EXPECT_CALL(process, func2(_))
     .WillOnce(ReturnArg<0>());
 
+  EXPECT_CALL(process, func5_(_));
+
   PID<DispatchProcess> pid = spawn(&process);
 
   ASSERT_FALSE(!pid);
 
   dispatch(pid, &DispatchProcess::func0);
+  dispatch(pid, &DispatchProcess::func5, MoveOnly());
 
   Future<bool> future;
 

Reply via email to