Added `CallableOnce` support in `Future`. `Future` guarantees that callbacks are called at most once, so it can use `lambda::CallableOnce` to expicitly declare this, and allow corresponding optimizations with moves.
Review: https://reviews.apache.org/r/63638/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8014e3f9 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8014e3f9 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8014e3f9 Branch: refs/heads/master Commit: 8014e3f9e1838745a6f3af7c1e2a557bd74349b0 Parents: 09b72e9 Author: Dmitry Zhuk <dz...@twopensource.com> Authored: Tue Dec 5 11:20:55 2017 -0800 Committer: Michael Park <mp...@apache.org> Committed: Tue Dec 5 11:20:55 2017 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/future.hpp | 240 ++++++++++++-------- 3rdparty/libprocess/src/tests/future_tests.cpp | 49 ++++ 2 files changed, 189 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8014e3f9/3rdparty/libprocess/include/process/future.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp index d48dfd7..cad4c5e 100644 --- a/3rdparty/libprocess/include/process/future.hpp +++ b/3rdparty/libprocess/include/process/future.hpp @@ -159,12 +159,12 @@ public: // Type of the callback functions that can get invoked when the // future gets set, fails, or is discarded. - typedef lambda::function<void()> AbandonedCallback; - typedef lambda::function<void()> DiscardCallback; - typedef lambda::function<void(const T&)> ReadyCallback; - typedef lambda::function<void(const std::string&)> FailedCallback; - typedef lambda::function<void()> DiscardedCallback; - typedef lambda::function<void(const Future<T>&)> AnyCallback; + typedef lambda::CallableOnce<void()> AbandonedCallback; + typedef lambda::CallableOnce<void()> DiscardCallback; + typedef lambda::CallableOnce<void(const T&)> ReadyCallback; + typedef lambda::CallableOnce<void(const std::string&)> FailedCallback; + typedef lambda::CallableOnce<void()> DiscardedCallback; + typedef lambda::CallableOnce<void(const Future<T>&)> AnyCallback; // Installs callbacks for the specified events and returns a const // reference to 'this' in order to easily support chaining. @@ -181,40 +181,43 @@ public: template <typename F> const Future<T>& onAbandoned(_Deferred<F>&& deferred) const { - return onAbandoned(deferred.operator std::function<void()>()); + return onAbandoned( + std::move(deferred).operator lambda::CallableOnce<void()>()); } template <typename F> const Future<T>& onDiscard(_Deferred<F>&& deferred) const { - return onDiscard(std::move(deferred).operator std::function<void()>()); + return onDiscard( + std::move(deferred).operator lambda::CallableOnce<void()>()); } template <typename F> const Future<T>& onReady(_Deferred<F>&& deferred) const { return onReady( - std::move(deferred).operator std::function<void(const T&)>()); + std::move(deferred).operator lambda::CallableOnce<void(const T&)>()); } template <typename F> const Future<T>& onFailed(_Deferred<F>&& deferred) const { - return onFailed( - std::move(deferred).operator std::function<void(const std::string&)>()); + return onFailed(std::move(deferred) + .operator lambda::CallableOnce<void(const std::string&)>()); } template <typename F> const Future<T>& onDiscarded(_Deferred<F>&& deferred) const { - return onDiscarded(std::move(deferred).operator std::function<void()>()); + return onDiscarded( + std::move(deferred).operator lambda::CallableOnce<void()>()); } template <typename F> const Future<T>& onAny(_Deferred<F>&& deferred) const { - return onAny( - std::move(deferred).operator std::function<void(const Future<T>&)>()); + return onAny(std::move(deferred) + .operator lambda::CallableOnce<void(const Future<T>&)>()); } private: @@ -233,10 +236,13 @@ private: template <typename F, typename = typename result_of<F(const T&)>::type> const Future<T>& onReady(F&& f, Prefer) const { - return onReady(std::function<void(const T&)>( - [=](const T& t) mutable { - f(t); - })); + return onReady(lambda::CallableOnce<void(const T&)>( + lambda::partial( + [](typename std::decay<F>::type&& f, const T& t) { + std::move(f)(t); + }, + std::forward<F>(f), + lambda::_1))); } // This is the less preferred `onReady`, we prefer the `onReady` method which @@ -254,19 +260,25 @@ private: F>::type()>::type> const Future<T>& onReady(F&& f, LessPrefer) const { - return onReady(std::function<void(const T&)>( - [=](const T&) mutable { - f(); - })); + return onReady(lambda::CallableOnce<void(const T&)>( + lambda::partial( + [](typename std::decay<F>::type&& f, const T&) { + std::move(f)(); + }, + std::forward<F>(f), + lambda::_1))); } template <typename F, typename = typename result_of<F(const std::string&)>::type> // NOLINT(whitespace/line_length) const Future<T>& onFailed(F&& f, Prefer) const { - return onFailed(std::function<void(const std::string&)>( - [=](const std::string& message) mutable { - f(message); - })); + return onFailed(lambda::CallableOnce<void(const std::string&)>( + lambda::partial( + [](typename std::decay<F>::type&& f, const std::string& message) { + std::move(f)(message); + }, + std::forward<F>(f), + lambda::_1))); } // Refer to the less preferred version of `onReady` for why these SFINAE @@ -278,19 +290,25 @@ private: F>::type()>::type> const Future<T>& onFailed(F&& f, LessPrefer) const { - return onFailed(std::function<void(const std::string&)>( - [=](const std::string&) mutable { - f(); - })); + return onFailed(lambda::CallableOnce<void(const std::string&)>( + lambda::partial( + [](typename std::decay<F>::type&& f, const std::string&) mutable { + std::move(f)(); + }, + std::forward<F>(f), + lambda::_1))); } template <typename F, typename = typename result_of<F(const Future<T>&)>::type> // NOLINT(whitespace/line_length) const Future<T>& onAny(F&& f, Prefer) const { - return onAny(std::function<void(const Future<T>&)>( - [=](const Future<T>& future) mutable { - f(future); - })); + return onAny(lambda::CallableOnce<void(const Future<T>&)>( + lambda::partial( + [](typename std::decay<F>::type&& f, const Future<T>& future) { + std::move(f)(future); + }, + std::forward<F>(f), + lambda::_1))); } // Refer to the less preferred version of `onReady` for why these SFINAE @@ -302,29 +320,36 @@ private: F>::type()>::type> const Future<T>& onAny(F&& f, LessPrefer) const { - return onAny(std::function<void(const Future<T>&)>( - [=](const Future<T>&) mutable { - f(); - })); + return onAny(lambda::CallableOnce<void(const Future<T>&)>( + lambda::partial( + [](typename std::decay<F>::type&& f, const Future<T>&) { + std::move(f)(); + }, + std::forward<F>(f), + lambda::_1))); } public: template <typename F> const Future<T>& onAbandoned(F&& f) const { - return onAbandoned(std::function<void()>( - [=]() mutable { - f(); - })); + return onAbandoned(lambda::CallableOnce<void()>( + lambda::partial( + [](typename std::decay<F>::type&& f) { + std::move(f)(); + }, + std::forward<F>(f)))); } template <typename F> const Future<T>& onDiscard(F&& f) const { - return onDiscard(std::function<void()>( - [=]() mutable { - f(); - })); + return onDiscard(lambda::CallableOnce<void()>( + lambda::partial( + [](typename std::decay<F>::type&& f) { + std::move(f)(); + }, + std::forward<F>(f)))); } template <typename F> @@ -342,10 +367,12 @@ public: template <typename F> const Future<T>& onDiscarded(F&& f) const { - return onDiscarded(std::function<void()>( - [=]() mutable { - f(); - })); + return onDiscarded(lambda::CallableOnce<void()>( + lambda::partial( + [](typename std::decay<F>::type&& f) { + std::move(f)(); + }, + std::forward<F>(f)))); } template <typename F> @@ -358,22 +385,23 @@ public: // and associates the result of the callback with the future that is // returned to the caller (which may be of a different type). template <typename X> - Future<X> then(lambda::function<Future<X>(const T&)> f) const; + Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const; template <typename X> - Future<X> then(lambda::function<X(const T&)> f) const; + Future<X> then(lambda::CallableOnce<X(const T&)> f) const; template <typename X> - Future<X> then(lambda::function<Future<X>()> f) const + Future<X> then(lambda::CallableOnce<Future<X>()> f) const { - return then( - lambda::function<Future<X>(const T&)>(lambda::bind(std::move(f)))); + return then(lambda::CallableOnce<Future<X>(const T&)>( + lambda::partial(std::move(f)))); } template <typename X> - Future<X> then(lambda::function<X()> f) const + Future<X> then(lambda::CallableOnce<X()> f) const { - return then(lambda::function<X(const T&)>(lambda::bind(std::move(f)))); + return then(lambda::CallableOnce<X(const T&)>( + lambda::partial(std::move(f)))); } private: @@ -385,7 +413,8 @@ private: { // note the then<X> is necessary to not have an infinite loop with // then(F&& f) - return then<X>(std::move(f).operator std::function<Future<X>(const T&)>()); + return then<X>( + std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>()); } // Refer to the less preferred version of `onReady` for why these SFINAE @@ -398,13 +427,14 @@ private: F>::type()>::type>::type> Future<X> then(_Deferred<F>&& f, LessPrefer) const { - return then<X>(std::move(f).operator std::function<Future<X>()>()); + return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>()); } template <typename F, typename X = typename internal::unwrap<typename result_of<F(const T&)>::type>::type> // NOLINT(whitespace/line_length) Future<X> then(F&& f, Prefer) const { - return then<X>(std::function<Future<X>(const T&)>(std::forward<F>(f))); + return then<X>( + lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f))); } // Refer to the less preferred version of `onReady` for why these SFINAE @@ -417,7 +447,7 @@ private: F>::type()>::type>::type> Future<X> then(F&& f, LessPrefer) const { - return then<X>(std::function<Future<X>()>(std::forward<F>(f))); + return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f))); } public: @@ -449,7 +479,7 @@ public: { return recover( std::move(deferred) - .operator std::function<Future<T>(const Future<T>&)>()); + .operator lambda::CallableOnce<Future<T>(const Future<T>&)>()); } // TODO(benh): Considering adding a `rescue` function for rescuing @@ -458,7 +488,7 @@ public: // Installs callbacks that get executed if this future completes // because it failed. Future<T> repair( - const lambda::function<Future<T>(const Future<T>&)>& f) const; + lambda::CallableOnce<Future<T>(const Future<T>&)> f) const; // TODO(benh): Add overloads of 'repair' that don't require passing // in a function that takes the 'const Future<T>&' parameter and use @@ -472,7 +502,7 @@ public: // was called on the returned future. Future<T> after( const Duration& duration, - const lambda::function<Future<T>(const Future<T>&)>& f) const; + lambda::CallableOnce<Future<T>(const Future<T>&)> f) const; // TODO(benh): Add overloads of 'after' that don't require passing // in a function that takes the 'const Future<T>&' parameter and use @@ -584,10 +614,10 @@ namespace internal { // // TODO(*): Invoke callbacks in another execution context. template <typename C, typename... Arguments> -void run(const std::vector<C>& callbacks, Arguments&&... arguments) +void run(std::vector<C>&& callbacks, Arguments&&... arguments) { for (size_t i = 0; i < callbacks.size(); ++i) { - callbacks[i](std::forward<Arguments>(arguments)...); + std::move(callbacks[i])(std::forward<Arguments>(arguments)...); } } @@ -995,8 +1025,8 @@ bool Promise<T>::discard(Future<T> future) // ourselves from one of the callbacks erroneously deleting the // future. In `Future::_set()` and `Future::fail()` we have to // explicitly take a copy to protect ourselves. - internal::run(future.data->onDiscardedCallbacks); - internal::run(future.data->onAnyCallbacks, future); + internal::run(std::move(future.data->onDiscardedCallbacks)); + internal::run(std::move(future.data->onAnyCallbacks), future); future.data->clearAllCallbacks(); } @@ -1157,7 +1187,7 @@ bool Future<T>::discard() // future. The callbacks get destroyed when we exit from the // function. if (result) { - internal::run(callbacks); + internal::run(std::move(callbacks)); } return result; @@ -1183,7 +1213,7 @@ bool Future<T>::abandon(bool propagating) // Invoke all callbacks. The callbacks get destroyed when we exit // from the function. if (result) { - internal::run(callbacks); + internal::run(std::move(callbacks)); } return result; @@ -1329,7 +1359,7 @@ const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const // TODO(*): Invoke callback in another execution context. if (run) { - callback(); // NOLINT(misc-use-after-move) + std::move(callback)(); // NOLINT(misc-use-after-move) } return *this; @@ -1351,7 +1381,7 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const // TODO(*): Invoke callback in another execution context. if (run) { - callback(); // NOLINT(misc-use-after-move) + std::move(callback)(); // NOLINT(misc-use-after-move) } return *this; @@ -1373,7 +1403,7 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const // TODO(*): Invoke callback in another execution context. if (run) { - callback(data->result.get()); // NOLINT(misc-use-after-move) + std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move) } return *this; @@ -1395,7 +1425,7 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const // TODO(*): Invoke callback in another execution context. if (run) { - callback(data->result.error()); // NOLINT(misc-use-after-move) + std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move) } return *this; @@ -1417,7 +1447,7 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const // TODO(*): Invoke callback in another execution context. if (run) { - callback(); // NOLINT(misc-use-after-move) + std::move(callback)(); // NOLINT(misc-use-after-move) } return *this; @@ -1439,7 +1469,7 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const // TODO(*): Invoke callback in another execution context. if (run) { - callback(*this); // NOLINT(misc-use-after-move) + std::move(callback)(*this); // NOLINT(misc-use-after-move) } return *this; @@ -1451,7 +1481,7 @@ namespace internal { // from the function 'then' whose parameter 'f' doesn't return a // Future since the compiler can't properly infer otherwise. template <typename T, typename X> -void thenf(const lambda::function<Future<X>(const T&)>& f, +void thenf(lambda::CallableOnce<Future<X>(const T&)>&& f, const std::shared_ptr<Promise<X>>& promise, const Future<T>& future) { @@ -1459,7 +1489,7 @@ void thenf(const lambda::function<Future<X>(const T&)>& f, if (future.hasDiscard()) { promise->discard(); } else { - promise->associate(f(future.get())); + promise->associate(std::move(f)(future.get())); } } else if (future.isFailed()) { promise->fail(future.failure()); @@ -1470,7 +1500,7 @@ void thenf(const lambda::function<Future<X>(const T&)>& f, template <typename T, typename X> -void then(const lambda::function<X(const T&)>& f, +void then(lambda::CallableOnce<X(const T&)>&& f, const std::shared_ptr<Promise<X>>& promise, const Future<T>& future) { @@ -1478,7 +1508,7 @@ void then(const lambda::function<X(const T&)>& f, if (future.hasDiscard()) { promise->discard(); } else { - promise->set(f(future.get())); + promise->set(std::move(f)(future.get())); } } else if (future.isFailed()) { promise->fail(future.failure()); @@ -1490,13 +1520,13 @@ void then(const lambda::function<X(const T&)>& f, template <typename T> void repair( - const lambda::function<Future<T>(const Future<T>&)>& f, + lambda::CallableOnce<Future<T>(const Future<T>&)>&& f, const std::shared_ptr<Promise<T>>& promise, const Future<T>& future) { CHECK(!future.isPending()); if (future.isFailed()) { - promise->associate(f(future)); + promise->associate(std::move(f)(future)); } else { promise->associate(future); } @@ -1505,7 +1535,7 @@ void repair( template <typename T> void expired( - const lambda::function<Future<T>(const Future<T>&)>& f, + const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& f, const std::shared_ptr<Latch>& latch, const std::shared_ptr<Promise<T>>& promise, const std::shared_ptr<Option<Timer>>& timer, @@ -1525,7 +1555,7 @@ void expired( // if the future has been discarded and rather than hiding a // non-deterministic bug we always call 'f' if the timer has // expired. - promise->associate(f(future)); + promise->associate(std::move(*f)(future)); } } @@ -1559,12 +1589,12 @@ void after( template <typename T> template <typename X> -Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const +Future<X> Future<T>::then(lambda::CallableOnce<Future<X>(const T&)> f) const { std::shared_ptr<Promise<X>> promise(new Promise<X>()); - lambda::function<void(const Future<T>&)> thenf = - lambda::bind(&internal::thenf<T, X>, std::move(f), promise, lambda::_1); + lambda::CallableOnce<void(const Future<T>&)> thenf = + lambda::partial(&internal::thenf<T, X>, std::move(f), promise, lambda::_1); onAny(std::move(thenf)); @@ -1583,12 +1613,12 @@ Future<X> Future<T>::then(lambda::function<Future<X>(const T&)> f) const template <typename T> template <typename X> -Future<X> Future<T>::then(lambda::function<X(const T&)> f) const +Future<X> Future<T>::then(lambda::CallableOnce<X(const T&)> f) const { std::shared_ptr<Promise<X>> promise(new Promise<X>()); - lambda::function<void(const Future<T>&)> then = - lambda::bind(&internal::then<T, X>, std::move(f), promise, lambda::_1); + lambda::CallableOnce<void(const Future<T>&)> then = + lambda::partial(&internal::then<T, X>, std::move(f), promise, lambda::_1); onAny(std::move(then)); @@ -1613,6 +1643,11 @@ Future<T> Future<T>::recover(F&& f) const const Future<T> future = *this; + typedef decltype(std::move(f)(future)) R; + + std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable( + new lambda::CallableOnce<R(const Future<T>&)>(std::move(f))); + onAny([=]() { if (future.isDiscarded() || future.isFailed()) { // We reset `discard` so that if a future gets returned from @@ -1624,7 +1659,7 @@ Future<T> Future<T>::recover(F&& f) const promise->f.data->discard = false; } - promise->set(f(future)); + promise->set(std::move(*callable)(future)); } else { promise->associate(future); } @@ -1635,7 +1670,7 @@ Future<T> Future<T>::recover(F&& f) const synchronized (promise->f.data->lock) { promise->f.data->discard = false; } - promise->set(f(future)); + promise->set(std::move(*callable)(future)); }); // Propagate discarding up the chain. To avoid cyclic dependencies, @@ -1649,11 +1684,12 @@ Future<T> Future<T>::recover(F&& f) const template <typename T> Future<T> Future<T>::repair( - const lambda::function<Future<T>(const Future<T>&)>& f) const + lambda::CallableOnce<Future<T>(const Future<T>&)> f) const { std::shared_ptr<Promise<T>> promise(new Promise<T>()); - onAny(lambda::bind(&internal::repair<T>, f, promise, lambda::_1)); + onAny( + lambda::partial(&internal::repair<T>, std::move(f), promise, lambda::_1)); onAbandoned([=]() { promise->future().abandon(); @@ -1671,7 +1707,7 @@ Future<T> Future<T>::repair( template <typename T> Future<T> Future<T>::after( const Duration& duration, - const lambda::function<Future<T>(const Future<T>&)>& f) const + lambda::CallableOnce<Future<T>(const Future<T>&)> f) const { // TODO(benh): Using a Latch here but Once might be cleaner. // Unfortunately, Once depends on Future so we can't easily use it @@ -1694,6 +1730,9 @@ Future<T> Future<T>::after( // issues we have to worry about. std::shared_ptr<Option<Timer>> timer(new Option<Timer>()); + typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F; + std::shared_ptr<F> callable(new F(std::move(f))); + // Set up a timer to invoke the callback if this future has not // completed. Note that we do not pass a weak reference for this // future as we don't want the future to get cleaned up and then @@ -1706,7 +1745,8 @@ Future<T> Future<T>::after( // force the deallocation of our copy of the timer). *timer = Clock::timer( duration, - lambda::bind(&internal::expired<T>, f, latch, promise, timer, *this)); + lambda::bind(&internal::expired<T>, callable, latch, promise, timer, + *this)); onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1)); @@ -1758,8 +1798,8 @@ bool Future<T>::_set(U&& u) // Grab a copy of `data` just in case invoking the callbacks // erroneously attempts to delete this future. std::shared_ptr<typename Future<T>::Data> copy = data; - internal::run(copy->onReadyCallbacks, copy->result.get()); - internal::run(copy->onAnyCallbacks, *this); + internal::run(std::move(copy->onReadyCallbacks), copy->result.get()); + internal::run(std::move(copy->onAnyCallbacks), *this); copy->clearAllCallbacks(); } @@ -1788,8 +1828,8 @@ bool Future<T>::fail(const std::string& _message) // Grab a copy of `data` just in case invoking the callbacks // erroneously attempts to delete this future. std::shared_ptr<typename Future<T>::Data> copy = data; - internal::run(copy->onFailedCallbacks, copy->result.error()); - internal::run(copy->onAnyCallbacks, *this); + internal::run(std::move(copy->onFailedCallbacks), copy->result.error()); + internal::run(std::move(copy->onAnyCallbacks), *this); copy->clearAllCallbacks(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/8014e3f9/3rdparty/libprocess/src/tests/future_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/future_tests.cpp b/3rdparty/libprocess/src/tests/future_tests.cpp index bb7a1c8..3dadd72 100644 --- a/3rdparty/libprocess/src/tests/future_tests.cpp +++ b/3rdparty/libprocess/src/tests/future_tests.cpp @@ -210,6 +210,55 @@ TEST(FutureTest, Then) } +TEST(FutureTest, CallableOnce) +{ + Promise<Nothing> promise; + promise.set(Nothing()); + + Future<int> future = promise.future() + .then(lambda::partial( + [](std::unique_ptr<int>&& o) { + return *o; + }, + std::unique_ptr<int>(new int(42)))); + + ASSERT_TRUE(future.isReady()); + EXPECT_EQ(42, future.get()); + + int n = 0; + future = promise.future() + .onReady(lambda::partial( + [&n](std::unique_ptr<int> o) { + n += *o; + }, + std::unique_ptr<int>(new int(1)))) + .onAny(lambda::partial( + [&n](std::unique_ptr<int>&& o) { + n += *o; + }, + std::unique_ptr<int>(new int(10)))) + .onFailed(lambda::partial( + [&n](const std::unique_ptr<int>& o) { + n += *o; + }, + std::unique_ptr<int>(new int(100)))) + .onDiscard(lambda::partial( + [&n](std::unique_ptr<int>&& o) { + n += *o; + }, + std::unique_ptr<int>(new int(1000)))) + .onDiscarded(lambda::partial( + [&n](std::unique_ptr<int>&& o) { + n += *o; + }, + std::unique_ptr<int>(new int(10000)))) + .then([&n]() { return n; }); + + ASSERT_TRUE(future.isReady()); + EXPECT_EQ(11, future.get()); +} + + Future<int> repair(const Future<int>& future) { EXPECT_TRUE(future.isFailed());