Added `CallableOnce` support in `Future`.

`Future` guarantees that callbacks are called at most once, so it can
use `lambda::CallableOnce` to expicitly declare this, and allow
corresponding optimizations with moves.

Review: https://reviews.apache.org/r/63638/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8014e3f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8014e3f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8014e3f9

Branch: refs/heads/master
Commit: 8014e3f9e1838745a6f3af7c1e2a557bd74349b0
Parents: 09b72e9
Author: Dmitry Zhuk <dz...@twopensource.com>
Authored: Tue Dec 5 11:20:55 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Tue Dec 5 11:20:55 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 240 ++++++++++++--------
 3rdparty/libprocess/src/tests/future_tests.cpp |  49 ++++
 2 files changed, 189 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8014e3f9/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp 
b/3rdparty/libprocess/include/process/future.hpp
index d48dfd7..cad4c5e 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -159,12 +159,12 @@ public:
 
   // Type of the callback functions that can get invoked when the
   // future gets set, fails, or is discarded.
-  typedef lambda::function<void()> AbandonedCallback;
-  typedef lambda::function<void()> DiscardCallback;
-  typedef lambda::function<void(const T&)> ReadyCallback;
-  typedef lambda::function<void(const std::string&)> FailedCallback;
-  typedef lambda::function<void()> DiscardedCallback;
-  typedef lambda::function<void(const Future<T>&)> AnyCallback;
+  typedef lambda::CallableOnce<void()> AbandonedCallback;
+  typedef lambda::CallableOnce<void()> DiscardCallback;
+  typedef lambda::CallableOnce<void(const T&)> ReadyCallback;
+  typedef lambda::CallableOnce<void(const std::string&)> FailedCallback;
+  typedef lambda::CallableOnce<void()> DiscardedCallback;
+  typedef lambda::CallableOnce<void(const Future<T>&)> AnyCallback;
 
   // Installs callbacks for the specified events and returns a const
   // reference to 'this' in order to easily support chaining.
@@ -181,40 +181,43 @@ public:
   template <typename F>
   const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
   {
-    return onAbandoned(deferred.operator std::function<void()>());
+    return onAbandoned(
+        std::move(deferred).operator lambda::CallableOnce<void()>());
   }
 
   template <typename F>
   const Future<T>& onDiscard(_Deferred<F>&& deferred) const
   {
-    return onDiscard(std::move(deferred).operator std::function<void()>());
+    return onDiscard(
+        std::move(deferred).operator lambda::CallableOnce<void()>());
   }
 
   template <typename F>
   const Future<T>& onReady(_Deferred<F>&& deferred) const
   {
     return onReady(
-        std::move(deferred).operator std::function<void(const T&)>());
+        std::move(deferred).operator lambda::CallableOnce<void(const T&)>());
   }
 
   template <typename F>
   const Future<T>& onFailed(_Deferred<F>&& deferred) const
   {
-    return onFailed(
-        std::move(deferred).operator std::function<void(const 
std::string&)>());
+    return onFailed(std::move(deferred)
+        .operator lambda::CallableOnce<void(const std::string&)>());
   }
 
   template <typename F>
   const Future<T>& onDiscarded(_Deferred<F>&& deferred) const
   {
-    return onDiscarded(std::move(deferred).operator std::function<void()>());
+    return onDiscarded(
+        std::move(deferred).operator lambda::CallableOnce<void()>());
   }
 
   template <typename F>
   const Future<T>& onAny(_Deferred<F>&& deferred) const
   {
-    return onAny(
-        std::move(deferred).operator std::function<void(const Future<T>&)>());
+    return onAny(std::move(deferred)
+        .operator lambda::CallableOnce<void(const Future<T>&)>());
   }
 
 private:
@@ -233,10 +236,13 @@ private:
   template <typename F, typename = typename result_of<F(const T&)>::type>
   const Future<T>& onReady(F&& f, Prefer) const
   {
-    return onReady(std::function<void(const T&)>(
-        [=](const T& t) mutable {
-          f(t);
-        }));
+    return onReady(lambda::CallableOnce<void(const T&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const T& t) {
+              std::move(f)(t);
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   // This is the less preferred `onReady`, we prefer the `onReady` method which
@@ -254,19 +260,25 @@ private:
           F>::type()>::type>
   const Future<T>& onReady(F&& f, LessPrefer) const
   {
-    return onReady(std::function<void(const T&)>(
-        [=](const T&) mutable {
-          f();
-        }));
+    return onReady(lambda::CallableOnce<void(const T&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const T&) {
+              std::move(f)();
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   template <typename F, typename = typename result_of<F(const 
std::string&)>::type> // NOLINT(whitespace/line_length)
   const Future<T>& onFailed(F&& f, Prefer) const
   {
-    return onFailed(std::function<void(const std::string&)>(
-        [=](const std::string& message) mutable {
-          f(message);
-        }));
+    return onFailed(lambda::CallableOnce<void(const std::string&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const std::string& message) {
+              std::move(f)(message);
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   // Refer to the less preferred version of `onReady` for why these SFINAE
@@ -278,19 +290,25 @@ private:
           F>::type()>::type>
   const Future<T>& onFailed(F&& f, LessPrefer) const
   {
-    return onFailed(std::function<void(const std::string&)>(
-        [=](const std::string&) mutable {
-          f();
-        }));
+    return onFailed(lambda::CallableOnce<void(const std::string&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const std::string&) mutable {
+              std::move(f)();
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   template <typename F, typename = typename result_of<F(const 
Future<T>&)>::type> // NOLINT(whitespace/line_length)
   const Future<T>& onAny(F&& f, Prefer) const
   {
-    return onAny(std::function<void(const Future<T>&)>(
-        [=](const Future<T>& future) mutable {
-          f(future);
-        }));
+    return onAny(lambda::CallableOnce<void(const Future<T>&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const Future<T>& future) {
+              std::move(f)(future);
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
   // Refer to the less preferred version of `onReady` for why these SFINAE
@@ -302,29 +320,36 @@ private:
           F>::type()>::type>
   const Future<T>& onAny(F&& f, LessPrefer) const
   {
-    return onAny(std::function<void(const Future<T>&)>(
-        [=](const Future<T>&) mutable {
-          f();
-        }));
+    return onAny(lambda::CallableOnce<void(const Future<T>&)>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f, const Future<T>&) {
+              std::move(f)();
+            },
+            std::forward<F>(f),
+            lambda::_1)));
   }
 
 public:
   template <typename F>
   const Future<T>& onAbandoned(F&& f) const
   {
-    return onAbandoned(std::function<void()>(
-        [=]() mutable {
-          f();
-        }));
+    return onAbandoned(lambda::CallableOnce<void()>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f) {
+              std::move(f)();
+            },
+            std::forward<F>(f))));
   }
 
   template <typename F>
   const Future<T>& onDiscard(F&& f) const
   {
-    return onDiscard(std::function<void()>(
-        [=]() mutable {
-          f();
-        }));
+    return onDiscard(lambda::CallableOnce<void()>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f) {
+              std::move(f)();
+            },
+            std::forward<F>(f))));
   }
 
   template <typename F>
@@ -342,10 +367,12 @@ public:
   template <typename F>
   const Future<T>& onDiscarded(F&& f) const
   {
-    return onDiscarded(std::function<void()>(
-        [=]() mutable {
-          f();
-        }));
+    return onDiscarded(lambda::CallableOnce<void()>(
+        lambda::partial(
+            [](typename std::decay<F>::type&& f) {
+              std::move(f)();
+            },
+            std::forward<F>(f))));
   }
 
   template <typename F>
@@ -358,22 +385,23 @@ public:
   // and associates the result of the callback with the future that is
   // returned to the caller (which may be of a different type).
   template <typename X>
-  Future<X> then(lambda::function<Future<X>(const T&)> f) const;
+  Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const;
 
   template <typename X>
-  Future<X> then(lambda::function<X(const T&)> f) const;
+  Future<X> then(lambda::CallableOnce<X(const T&)> f) const;
 
   template <typename X>
-  Future<X> then(lambda::function<Future<X>()> f) const
+  Future<X> then(lambda::CallableOnce<Future<X>()> f) const
   {
-    return then(
-        lambda::function<Future<X>(const T&)>(lambda::bind(std::move(f))));
+    return then(lambda::CallableOnce<Future<X>(const T&)>(
+        lambda::partial(std::move(f))));
   }
 
   template <typename X>
-  Future<X> then(lambda::function<X()> f) const
+  Future<X> then(lambda::CallableOnce<X()> f) const
   {
-    return then(lambda::function<X(const T&)>(lambda::bind(std::move(f))));
+    return then(lambda::CallableOnce<X(const T&)>(
+        lambda::partial(std::move(f))));
   }
 
 private:
@@ -385,7 +413,8 @@ private:
   {
     // note the then<X> is necessary to not have an infinite loop with
     // then(F&& f)
-    return then<X>(std::move(f).operator std::function<Future<X>(const T&)>());
+    return then<X>(
+        std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>());
   }
 
   // Refer to the less preferred version of `onReady` for why these SFINAE
@@ -398,13 +427,14 @@ private:
               F>::type()>::type>::type>
   Future<X> then(_Deferred<F>&& f, LessPrefer) const
   {
-    return then<X>(std::move(f).operator std::function<Future<X>()>());
+    return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>());
   }
 
   template <typename F, typename X = typename internal::unwrap<typename 
result_of<F(const T&)>::type>::type> // NOLINT(whitespace/line_length)
   Future<X> then(F&& f, Prefer) const
   {
-    return then<X>(std::function<Future<X>(const T&)>(std::forward<F>(f)));
+    return then<X>(
+        lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f)));
   }
 
   // Refer to the less preferred version of `onReady` for why these SFINAE
@@ -417,7 +447,7 @@ private:
               F>::type()>::type>::type>
   Future<X> then(F&& f, LessPrefer) const
   {
-    return then<X>(std::function<Future<X>()>(std::forward<F>(f)));
+    return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f)));
   }
 
 public:
@@ -449,7 +479,7 @@ public:
   {
     return recover(
         std::move(deferred)
-          .operator std::function<Future<T>(const Future<T>&)>());
+          .operator lambda::CallableOnce<Future<T>(const Future<T>&)>());
   }
 
   // TODO(benh): Considering adding a `rescue` function for rescuing
@@ -458,7 +488,7 @@ public:
   // Installs callbacks that get executed if this future completes
   // because it failed.
   Future<T> repair(
-      const lambda::function<Future<T>(const Future<T>&)>& f) const;
+      lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
 
   // TODO(benh): Add overloads of 'repair' that don't require passing
   // in a function that takes the 'const Future<T>&' parameter and use
@@ -472,7 +502,7 @@ public:
   // was called on the returned future.
   Future<T> after(
       const Duration& duration,
-      const lambda::function<Future<T>(const Future<T>&)>& f) const;
+      lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
 
   // TODO(benh): Add overloads of 'after' that don't require passing
   // in a function that takes the 'const Future<T>&' parameter and use
@@ -584,10 +614,10 @@ namespace internal {
 //
 // TODO(*): Invoke callbacks in another execution context.
 template <typename C, typename... Arguments>
-void run(const std::vector<C>& callbacks, Arguments&&... arguments)
+void run(std::vector<C>&& callbacks, Arguments&&... arguments)
 {
   for (size_t i = 0; i < callbacks.size(); ++i) {
-    callbacks[i](std::forward<Arguments>(arguments)...);
+    std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
   }
 }
 
@@ -995,8 +1025,8 @@ bool Promise<T>::discard(Future<T> future)
     // ourselves from one of the callbacks erroneously deleting the
     // future. In `Future::_set()` and `Future::fail()` we have to
     // explicitly take a copy to protect ourselves.
-    internal::run(future.data->onDiscardedCallbacks);
-    internal::run(future.data->onAnyCallbacks, future);
+    internal::run(std::move(future.data->onDiscardedCallbacks));
+    internal::run(std::move(future.data->onAnyCallbacks), future);
 
     future.data->clearAllCallbacks();
   }
@@ -1157,7 +1187,7 @@ bool Future<T>::discard()
   // future. The callbacks get destroyed when we exit from the
   // function.
   if (result) {
-    internal::run(callbacks);
+    internal::run(std::move(callbacks));
   }
 
   return result;
@@ -1183,7 +1213,7 @@ bool Future<T>::abandon(bool propagating)
   // Invoke all callbacks. The callbacks get destroyed when we exit
   // from the function.
   if (result) {
-    internal::run(callbacks);
+    internal::run(std::move(callbacks));
   }
 
   return result;
@@ -1329,7 +1359,7 @@ const Future<T>& 
Future<T>::onAbandoned(AbandonedCallback&& callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(); // NOLINT(misc-use-after-move)
+    std::move(callback)(); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1351,7 +1381,7 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& 
callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(); // NOLINT(misc-use-after-move)
+    std::move(callback)(); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1373,7 +1403,7 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& 
callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(data->result.get()); // NOLINT(misc-use-after-move)
+    std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1395,7 +1425,7 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& 
callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(data->result.error()); // NOLINT(misc-use-after-move)
+    std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1417,7 +1447,7 @@ const Future<T>& 
Future<T>::onDiscarded(DiscardedCallback&& callback) const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(); // NOLINT(misc-use-after-move)
+    std::move(callback)(); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1439,7 +1469,7 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) 
const
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(*this); // NOLINT(misc-use-after-move)
+    std::move(callback)(*this); // NOLINT(misc-use-after-move)
   }
 
   return *this;
@@ -1451,7 +1481,7 @@ namespace internal {
 // from the function 'then' whose parameter 'f' doesn't return a
 // Future since the compiler can't properly infer otherwise.
 template <typename T, typename X>
-void thenf(const lambda::function<Future<X>(const T&)>& f,
+void thenf(lambda::CallableOnce<Future<X>(const T&)>&& f,
            const std::shared_ptr<Promise<X>>& promise,
            const Future<T>& future)
 {
@@ -1459,7 +1489,7 @@ void thenf(const lambda::function<Future<X>(const T&)>& f,
     if (future.hasDiscard()) {
       promise->discard();
     } else {
-      promise->associate(f(future.get()));
+      promise->associate(std::move(f)(future.get()));
     }
   } else if (future.isFailed()) {
     promise->fail(future.failure());
@@ -1470,7 +1500,7 @@ void thenf(const lambda::function<Future<X>(const T&)>& f,
 
 
 template <typename T, typename X>
-void then(const lambda::function<X(const T&)>& f,
+void then(lambda::CallableOnce<X(const T&)>&& f,
           const std::shared_ptr<Promise<X>>& promise,
           const Future<T>& future)
 {
@@ -1478,7 +1508,7 @@ void then(const lambda::function<X(const T&)>& f,
     if (future.hasDiscard()) {
       promise->discard();
     } else {
-      promise->set(f(future.get()));
+      promise->set(std::move(f)(future.get()));
     }
   } else if (future.isFailed()) {
     promise->fail(future.failure());
@@ -1490,13 +1520,13 @@ void then(const lambda::function<X(const T&)>& f,
 
 template <typename T>
 void repair(
-    const lambda::function<Future<T>(const Future<T>&)>& f,
+    lambda::CallableOnce<Future<T>(const Future<T>&)>&& f,
     const std::shared_ptr<Promise<T>>& promise,
     const Future<T>& future)
 {
   CHECK(!future.isPending());
   if (future.isFailed()) {
-    promise->associate(f(future));
+    promise->associate(std::move(f)(future));
   } else {
     promise->associate(future);
   }
@@ -1505,7 +1535,7 @@ void repair(
 
 template <typename T>
 void expired(
-    const lambda::function<Future<T>(const Future<T>&)>& f,
+    const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& 
f,
     const std::shared_ptr<Latch>& latch,
     const std::shared_ptr<Promise<T>>& promise,
     const std::shared_ptr<Option<Timer>>& timer,
@@ -1525,7 +1555,7 @@ void expired(
     // if the future has been discarded and rather than hiding a
     // non-deterministic bug we always call 'f' if the timer has
     // expired.
-    promise->associate(f(future));
+    promise->associate(std::move(*f)(future));
   }
 }
 
@@ -1559,12 +1589,12 @@ void after(
 
 template <typename T>
 template <typename X>
-Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const
+Future<X> Future<T>::then(lambda::CallableOnce<Future<X>(const T&)> f) const
 {
   std::shared_ptr<Promise<X>> promise(new Promise<X>());
 
-  lambda::function<void(const Future<T>&)> thenf =
-    lambda::bind(&internal::thenf<T, X>, std::move(f), promise, lambda::_1);
+  lambda::CallableOnce<void(const Future<T>&)> thenf =
+    lambda::partial(&internal::thenf<T, X>, std::move(f), promise, lambda::_1);
 
   onAny(std::move(thenf));
 
@@ -1583,12 +1613,12 @@ Future<X> 
Future<T>::then(lambda::function<Future<X>(const T&)> f) const
 
 template <typename T>
 template <typename X>
-Future<X> Future<T>::then(lambda::function<X(const T&)> f) const
+Future<X> Future<T>::then(lambda::CallableOnce<X(const T&)> f) const
 {
   std::shared_ptr<Promise<X>> promise(new Promise<X>());
 
-  lambda::function<void(const Future<T>&)> then =
-    lambda::bind(&internal::then<T, X>, std::move(f), promise, lambda::_1);
+  lambda::CallableOnce<void(const Future<T>&)> then =
+    lambda::partial(&internal::then<T, X>, std::move(f), promise, lambda::_1);
 
   onAny(std::move(then));
 
@@ -1613,6 +1643,11 @@ Future<T> Future<T>::recover(F&& f) const
 
   const Future<T> future = *this;
 
+  typedef decltype(std::move(f)(future)) R;
+
+  std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
+      new lambda::CallableOnce<R(const Future<T>&)>(std::move(f)));
+
   onAny([=]() {
     if (future.isDiscarded() || future.isFailed()) {
       // We reset `discard` so that if a future gets returned from
@@ -1624,7 +1659,7 @@ Future<T> Future<T>::recover(F&& f) const
         promise->f.data->discard = false;
       }
 
-      promise->set(f(future));
+      promise->set(std::move(*callable)(future));
     } else {
       promise->associate(future);
     }
@@ -1635,7 +1670,7 @@ Future<T> Future<T>::recover(F&& f) const
     synchronized (promise->f.data->lock) {
       promise->f.data->discard = false;
     }
-    promise->set(f(future));
+    promise->set(std::move(*callable)(future));
   });
 
   // Propagate discarding up the chain. To avoid cyclic dependencies,
@@ -1649,11 +1684,12 @@ Future<T> Future<T>::recover(F&& f) const
 
 template <typename T>
 Future<T> Future<T>::repair(
-    const lambda::function<Future<T>(const Future<T>&)>& f) const
+    lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
 {
   std::shared_ptr<Promise<T>> promise(new Promise<T>());
 
-  onAny(lambda::bind(&internal::repair<T>, f, promise, lambda::_1));
+  onAny(
+      lambda::partial(&internal::repair<T>, std::move(f), promise, 
lambda::_1));
 
   onAbandoned([=]() {
     promise->future().abandon();
@@ -1671,7 +1707,7 @@ Future<T> Future<T>::repair(
 template <typename T>
 Future<T> Future<T>::after(
     const Duration& duration,
-    const lambda::function<Future<T>(const Future<T>&)>& f) const
+    lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
 {
   // TODO(benh): Using a Latch here but Once might be cleaner.
   // Unfortunately, Once depends on Future so we can't easily use it
@@ -1694,6 +1730,9 @@ Future<T> Future<T>::after(
   // issues we have to worry about.
   std::shared_ptr<Option<Timer>> timer(new Option<Timer>());
 
+  typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F;
+  std::shared_ptr<F> callable(new F(std::move(f)));
+
   // Set up a timer to invoke the callback if this future has not
   // completed. Note that we do not pass a weak reference for this
   // future as we don't want the future to get cleaned up and then
@@ -1706,7 +1745,8 @@ Future<T> Future<T>::after(
   // force the deallocation of our copy of the timer).
   *timer = Clock::timer(
       duration,
-      lambda::bind(&internal::expired<T>, f, latch, promise, timer, *this));
+      lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
+          *this));
 
   onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
 
@@ -1758,8 +1798,8 @@ bool Future<T>::_set(U&& u)
     // Grab a copy of `data` just in case invoking the callbacks
     // erroneously attempts to delete this future.
     std::shared_ptr<typename Future<T>::Data> copy = data;
-    internal::run(copy->onReadyCallbacks, copy->result.get());
-    internal::run(copy->onAnyCallbacks, *this);
+    internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
+    internal::run(std::move(copy->onAnyCallbacks), *this);
 
     copy->clearAllCallbacks();
   }
@@ -1788,8 +1828,8 @@ bool Future<T>::fail(const std::string& _message)
     // Grab a copy of `data` just in case invoking the callbacks
     // erroneously attempts to delete this future.
     std::shared_ptr<typename Future<T>::Data> copy = data;
-    internal::run(copy->onFailedCallbacks, copy->result.error());
-    internal::run(copy->onAnyCallbacks, *this);
+    internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
+    internal::run(std::move(copy->onAnyCallbacks), *this);
 
     copy->clearAllCallbacks();
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/8014e3f9/3rdparty/libprocess/src/tests/future_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/future_tests.cpp 
b/3rdparty/libprocess/src/tests/future_tests.cpp
index bb7a1c8..3dadd72 100644
--- a/3rdparty/libprocess/src/tests/future_tests.cpp
+++ b/3rdparty/libprocess/src/tests/future_tests.cpp
@@ -210,6 +210,55 @@ TEST(FutureTest, Then)
 }
 
 
+TEST(FutureTest, CallableOnce)
+{
+  Promise<Nothing> promise;
+  promise.set(Nothing());
+
+  Future<int> future = promise.future()
+    .then(lambda::partial(
+        [](std::unique_ptr<int>&& o) {
+          return *o;
+        },
+        std::unique_ptr<int>(new int(42))));
+
+  ASSERT_TRUE(future.isReady());
+  EXPECT_EQ(42, future.get());
+
+  int n = 0;
+  future = promise.future()
+    .onReady(lambda::partial(
+        [&n](std::unique_ptr<int> o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(1))))
+    .onAny(lambda::partial(
+        [&n](std::unique_ptr<int>&& o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(10))))
+    .onFailed(lambda::partial(
+        [&n](const std::unique_ptr<int>& o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(100))))
+    .onDiscard(lambda::partial(
+        [&n](std::unique_ptr<int>&& o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(1000))))
+    .onDiscarded(lambda::partial(
+        [&n](std::unique_ptr<int>&& o) {
+          n += *o;
+        },
+        std::unique_ptr<int>(new int(10000))))
+    .then([&n]() { return n; });
+
+  ASSERT_TRUE(future.isReady());
+  EXPECT_EQ(11, future.get());
+}
+
+
 Future<int> repair(const Future<int>& future)
 {
   EXPECT_TRUE(future.isFailed());

Reply via email to