PROTON-1481: [C++ binding] Solidify the proton::work_queue API - Allow for complete deprecation of strange C++03 proton::void_function0 class This can be used in the current code, but it can be removed without losing C++03 functionality. - In C++03 you have to create injected work with one of the proton::make_work() overloads. This functions like the C++11 std::bind and can create a function-like object from a function point and arguments or from a member function point, object pointer and arguments. - In C++11 you can use proton::make_work, std::function<void()>, a lambda expression, or any function-like object to create injected work. WIP: solidify work & work_queue API
WIP: remove schedule_work API leaving only make_work WIP: Improve work documentation a little Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2a7c2315 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2a7c2315 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2a7c2315 Branch: refs/heads/go1 Commit: 2a7c2315e4df9f068ff8e068d5f7d3a8fe229b0a Parents: 00dda18 Author: Andrew Stitcher <astitc...@apache.org> Authored: Tue Oct 10 16:27:05 2017 -0400 Committer: Andrew Stitcher <astitc...@apache.org> Committed: Tue Oct 10 23:38:09 2017 -0400 ---------------------------------------------------------------------- examples/cpp/broker.cpp | 24 +- examples/cpp/scheduled_send_03.cpp | 6 +- examples/cpp/service_bus.cpp | 17 +- .../bindings/cpp/include/proton/container.hpp | 3 + .../bindings/cpp/include/proton/work_queue.hpp | 388 +++++++++---------- proton-c/bindings/cpp/src/container.cpp | 2 + proton-c/bindings/cpp/src/container_test.cpp | 2 +- proton-c/bindings/cpp/src/work_queue.cpp | 8 + 8 files changed, 210 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a7c2315/examples/cpp/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp index 5cf0c85..a1c379b 100644 --- a/examples/cpp/broker.cpp +++ b/examples/cpp/broker.cpp @@ -146,7 +146,7 @@ class Queue { DOUT(std::cerr << "(" << current_->second << ") ";); if (current_->second>0) { DOUT(std::cerr << current_->first << " ";); - proton::schedule_work(current_->first, &Sender::sendMsg, current_->first, messages_.front()); + current_->first->add(make_work(&Sender::sendMsg, current_->first, messages_.front())); messages_.pop_front(); --current_->second; ++current_; @@ -185,14 +185,14 @@ public: // If we're about to erase the current subscription move on if (current_ != subscriptions_.end() && current_->first==s) ++current_; subscriptions_.erase(s); - proton::schedule_work(s, &Sender::unsubscribed, s); + s->add(make_work(&Sender::unsubscribed, s)); } }; // We have credit to send a message. void Sender::on_sendable(proton::sender &sender) { if (queue_) { - proton::schedule_work(queue_, &Queue::flow, queue_, this, sender.credit()); + queue_->add(make_work(&Queue::flow, queue_, this, sender.credit())); } else { pending_credit_ = sender.credit(); } @@ -200,7 +200,7 @@ void Sender::on_sendable(proton::sender &sender) { void Sender::on_sender_close(proton::sender &sender) { if (queue_) { - proton::schedule_work(queue_, &Queue::unsubscribe, queue_, this); + queue_->add(make_work(&Queue::unsubscribe, queue_, this)); } else { // TODO: Is it possible to be closed before we get the queue allocated? // If so, we should have a way to mark the sender deleted, so we can delete @@ -214,12 +214,12 @@ void Sender::boundQueue(Queue* q, std::string qn) { queue_ = q; queue_name_ = qn; - proton::schedule_work(q, &Queue::subscribe, q, this); + q->add(make_work(&Queue::subscribe, q, this)); sender_.open(proton::sender_options() .source((proton::source_options().address(queue_name_))) .handler(*this)); if (pending_credit_>0) { - proton::schedule_work(queue_, &Queue::flow, queue_, this, pending_credit_); + queue_->add(make_work(&Queue::flow, queue_, this, pending_credit_)); } std::cout << "sending from " << queue_name_ << std::endl; } @@ -244,7 +244,7 @@ class Receiver : public proton::messaging_handler { void queueMsgs() { DOUT(std::cerr << "Receiver: " << this << " queueing " << messages_.size() << " msgs to: " << queue_ << "\n";); while (!messages_.empty()) { - proton::schedule_work(queue_, &Queue::queueMsg, queue_, messages_.front()); + queue_->add(make_work(&Queue::queueMsg, queue_, messages_.front())); messages_.pop_front(); } } @@ -302,7 +302,7 @@ public: } else { q = i->second; } - proton::schedule_work(&connection, &T::boundQueue, &connection, q, qn); + connection.add(make_work(&T::boundQueue, &connection, q, qn)); } void findQueueSender(Sender* s, std::string qn) { @@ -332,7 +332,7 @@ public: std::string qn = sender.source().dynamic() ? "" : sender.source().address(); Sender* s = new Sender(sender, senders_); senders_[sender] = s; - proton::schedule_work(&queue_manager_, &QueueManager::findQueueSender, &queue_manager_, s, qn); + queue_manager_.add(make_work(&QueueManager::findQueueSender, &queue_manager_, s, qn)); } // A receiver receives messages from a publisher to a queue. @@ -348,7 +348,7 @@ public: DOUT(std::cerr << "ODD - trying to attach to a empty address\n";); } Receiver* r = new Receiver(receiver); - proton::schedule_work(&queue_manager_, &QueueManager::findQueueReceiver, &queue_manager_, r, qname); + queue_manager_.add(make_work(&QueueManager::findQueueReceiver, &queue_manager_, r, qname)); } } @@ -359,7 +359,7 @@ public: if (j == senders_.end()) continue; Sender* s = j->second; if (s->queue_) { - proton::schedule_work(s->queue_, &Queue::unsubscribe, s->queue_, s); + s->queue_->add(make_work(&Queue::unsubscribe, s->queue_, s)); } senders_.erase(j); } @@ -377,7 +377,7 @@ public: if (j == senders_.end()) continue; Sender* s = j->second; if (s->queue_) { - proton::schedule_work(s->queue_, &Queue::unsubscribe, s->queue_, s); + s->queue_->add(make_work(&Queue::unsubscribe, s->queue_, s)); } } delete this; // All done. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a7c2315/examples/cpp/scheduled_send_03.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/scheduled_send_03.cpp b/examples/cpp/scheduled_send_03.cpp index 8f058c7..5299bde 100644 --- a/examples/cpp/scheduled_send_03.cpp +++ b/examples/cpp/scheduled_send_03.cpp @@ -60,8 +60,8 @@ class scheduled_sender : public proton::messaging_handler { void on_sender_open(proton::sender & s) OVERRIDE { work_queue = &s.work_queue(); - proton::schedule_work(work_queue, timeout, &scheduled_sender::cancel, this, s); - proton::schedule_work(work_queue, interval, &scheduled_sender::tick, this, s); + work_queue->schedule(timeout, make_work(&scheduled_sender::cancel, this, s)); + work_queue->schedule(interval, make_work(&scheduled_sender::tick, this, s)); } void cancel(proton::sender sender) { @@ -71,7 +71,7 @@ class scheduled_sender : public proton::messaging_handler { void tick(proton::sender sender) { if (!canceled) { - proton::schedule_work(work_queue, interval, &scheduled_sender::tick, this, sender); // Next tick + work_queue->schedule(interval, make_work(&scheduled_sender::tick, this, sender)); // Next tick if (sender.credit() > 0) // Only send if we have credit send(sender); else http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a7c2315/examples/cpp/service_bus.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/service_bus.cpp b/examples/cpp/service_bus.cpp index 7f9e3eb..3ec7ad1 100644 --- a/examples/cpp/service_bus.cpp +++ b/examples/cpp/service_bus.cpp @@ -129,20 +129,9 @@ class session_receiver : public proton::messaging_handler { proton::container *container; proton::receiver receiver; - - struct process_timeout_fn : public proton::void_function0 { - session_receiver& parent; - process_timeout_fn(session_receiver& sr) : parent(sr) {} - void operator()() { parent.process_timeout(); } - }; - - process_timeout_fn do_process_timeout; - - public: session_receiver(const std::string &c, const std::string &e, - const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), - last_read(0), container(0), do_process_timeout(*this) { + const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) { if (sid) session_identifier = std::string(sid); // session_identifier is now either empty/null or an AMQP string type. @@ -172,7 +161,7 @@ class session_receiver : public proton::messaging_handler { // identifier if none was specified). last_read = proton::timestamp::now(); // Call this->process_timeout after read_timeout. - container->schedule(read_timeout, do_process_timeout); + container->schedule(read_timeout, make_work(&session_receiver::process_timeout, this)); } void on_receiver_open(proton::receiver &r) OVERRIDE { @@ -202,7 +191,7 @@ class session_receiver : public proton::messaging_handler { std::cout << "Done. No more messages." << std::endl; } else { proton::duration next = deadline - now; - container->schedule(next, do_process_timeout); + container->schedule(next, make_work(&session_receiver::process_timeout, this)); } } }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a7c2315/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index f0f5bc6..d30e45a 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -305,6 +305,9 @@ class PN_CPP_CLASS_EXTERN container { /// `std::function<void()>` type for the `fn` parameter. PN_CPP_EXTERN void schedule(duration dur, work fn); + /// @deprecated + PN_CPP_EXTERN void schedule(duration dur, void_function0& fn); + /// @cond INTERNAL /// This is a hack to ensure that the C++03 version is declared /// only during the compilation of the library http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a7c2315/proton-c/bindings/cpp/include/proton/work_queue.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp index f4b259f..801c2dc 100644 --- a/proton-c/bindings/cpp/include/proton/work_queue.hpp +++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp @@ -30,6 +30,10 @@ #include "./internal/pn_unique_ptr.hpp" #include <functional> +#include <utility> +#if PN_CPP_HAS_LAMBDAS && PN_CPP_HAS_VARIADIC_TEMPLATES +#include <type_traits> +#endif struct pn_connection_t; struct pn_session_t; @@ -45,39 +49,45 @@ namespace proton { /// It can be created from a function that takes no parameters and /// returns no value. namespace v03 { -class work { - public: - /// **Unsettled API** - work(void_function0& f): item_(&f) {} - /// **Unsettled API** - work() {} +/// @cond INTERNAL +struct invocable { + invocable() {} + virtual ~invocable() {} - /// **Unsettled API** - void operator()() { (*item_)(); } + virtual invocable& clone() const = 0; + virtual void operator() () = 0; +}; - ~work() {} +template <class T> +struct invocable_cloner : invocable { + virtual ~invocable_cloner() {} + virtual invocable& clone() const { + return *new T(static_cast<T const&>(*this)); + } +}; +struct invocable_wrapper { + invocable_wrapper(): wrapped_(0) {} + invocable_wrapper(const invocable_wrapper& w): wrapped_(&w.wrapped_->clone()) {} + invocable_wrapper& operator=(invocable_wrapper& that) {std::swap(wrapped_, that.wrapped_); return *this; } + ~invocable_wrapper() { delete wrapped_; } - private: - void_function0* item_; + invocable_wrapper(const invocable& i): wrapped_(&i.clone()) {} + void operator()() { (*wrapped_)(); } + + invocable* wrapped_; }; -} +/// @endcond -#if PN_CPP_HAS_LAMBDAS && PN_CPP_HAS_VARIADIC_TEMPLATES -namespace v11 { class work { public: /// **Unsettled API** - work(void_function0& f): item_( [&f]() { f(); }) {} - - /// **Unsettled API** work() {} - /// **Unsettled API** - Construct a unit of work from a - /// std::function. - template <class T> - work(T f): item_(f) {} + /// @cond INTERNAL + work(const invocable& i): item_(i) {} + /// @endcond /// **Unsettled API** void operator()() { item_(); } @@ -85,102 +95,14 @@ class work { ~work() {} private: - std::function<void()> item_; -}; -} -#endif - -#if PN_CPP_HAS_LAMBDAS && PN_CPP_HAS_VARIADIC_TEMPLATES -using v11::work; -#else -using v03::work; -#endif - -/// **Unsettled API** - A context for thread-safe execution of work. -/// -/// Event-handler functions associated with a single -/// `proton::connection` are called in sequence. The connection's -/// `proton::work_queue` allows you to "inject" extra work from -/// any thread and have it executed in the same sequence. -/// -/// You may also create arbitrary `proton::work_queue` objects backed -/// by a @ref container that allow other objects to have their own -/// serialised work queues that can have work injected safely from -/// other threads. The @ref container ensures that the work is -/// correctly serialised. -/// -/// The `proton::work` class represents the work to be queued and can -/// be created from a function that takes no parameters and returns no -/// value. -class PN_CPP_CLASS_EXTERN work_queue { - /// @cond INTERNAL - class impl; - work_queue& operator=(impl* i); - /// @endcond - - public: - /// **Unsettled API** - Create a work queue. - PN_CPP_EXTERN work_queue(); - - /// **Unsettled API** - Create a work queue backed by a container. - PN_CPP_EXTERN work_queue(container&); - - PN_CPP_EXTERN ~work_queue(); - - /// **Unsettled API** - Add work `fn` to the work queue. - - /// Work `fn` will be called serially with other work in the queue. - /// The work may be deferred and executed in another thread. - /// - /// @return true if `fn` has been or will be called; false if the - /// event loops is ended or `fn` cannot be injected for any other - /// reason. - PN_CPP_EXTERN bool add(work fn); - - /// @cond INTERNAL - /// This is a hack to ensure that the C++03 version is declared - /// only during the compilation of the library -#if PN_CPP_HAS_LAMBDAS && PN_CPP_HAS_VARIADIC_TEMPLATES && defined(qpid_proton_cpp_EXPORTS) - PN_CPP_EXTERN bool add(v03::work fn); -#endif - /// @endcond - - /// **Unsettled API** - Add work `fn` to the work queue after a - /// duration. - /// - /// Scheduled execution is "best effort". It may not be possible - /// to inject the work after the elapsed duration. There will be - /// no indication of this. - /// - /// @copydetails add() - PN_CPP_EXTERN void schedule(duration, work fn); - - /// @cond INTERNAL - /// This is a hack to ensure that the C++03 version is declared - /// only during the compilation of the library -#if PN_CPP_HAS_LAMBDAS && PN_CPP_HAS_VARIADIC_TEMPLATES && defined(qpid_proton_cpp_EXPORTS) - PN_CPP_EXTERN void schedule(duration, v03::work fn); -#endif - /// @endcond - - private: - PN_CPP_EXTERN static work_queue& get(pn_connection_t*); - PN_CPP_EXTERN static work_queue& get(pn_session_t*); - PN_CPP_EXTERN static work_queue& get(pn_link_t*); - - internal::pn_unique_ptr<impl> impl_; - - /// @cond INTERNAL - friend class container; - friend class io::connection_driver; - /// @endcond + invocable_wrapper item_; }; -// Utilities to make injecting functions/member functions palatable in C++03 -// Lots of repetition to handle functions with up to 3 arguments -#if !PN_CPP_HAS_LAMBDAS || !PN_CPP_HAS_VARIADIC_TEMPLATES +/// @cond INTERNAL +// Utilities to make work from functions/member functions (C++03 version) +// Lots of repetition to handle functions/member functions with up to 3 arguments template <class R> -struct work0 : public proton::void_function0 { +struct work0 : public invocable_cloner<work0<R> > { R (* fn_)(); work0(R (* f)()) : @@ -188,12 +110,11 @@ struct work0 : public proton::void_function0 { void operator()() { (*fn_)(); - delete this; } }; template <class R, class A> -struct work1 : public proton::void_function0 { +struct work1 : public invocable_cloner<work1<R,A> > { R (* fn_)(A); A a_; @@ -202,12 +123,11 @@ struct work1 : public proton::void_function0 { void operator()() { (*fn_)(a_); - delete this; } }; template <class R, class A, class B> -struct work2 : public proton::void_function0 { +struct work2 : public invocable_cloner<work2<R,A,B> > { R (* fn_)(A, B); A a_; B b_; @@ -217,12 +137,11 @@ struct work2 : public proton::void_function0 { void operator()() { (*fn_)(a_, b_); - delete this; } }; template <class R, class A, class B, class C> -struct work3 : public proton::void_function0 { +struct work3 : public invocable_cloner<work3<R,A,B,C> > { R (* fn_)(A, B, C); A a_; B b_; @@ -233,12 +152,11 @@ struct work3 : public proton::void_function0 { void operator()() { (*fn_)(a_, b_, c_); - delete this; } }; template <class R, class T> -struct work_pmf0 : public proton::void_function0 { +struct work_pmf0 : public invocable_cloner<work_pmf0<R,T> > { T& holder_; R (T::* fn_)(); @@ -247,12 +165,11 @@ struct work_pmf0 : public proton::void_function0 { void operator()() { (holder_.*fn_)(); - delete this; } }; template <class R, class T, class A> -struct work_pmf1 : public proton::void_function0 { +struct work_pmf1 : public invocable_cloner<work_pmf1<R,T,A> > { T& holder_; R (T::* fn_)(A); A a_; @@ -262,12 +179,11 @@ struct work_pmf1 : public proton::void_function0 { void operator()() { (holder_.*fn_)(a_); - delete this; } }; template <class R, class T, class A, class B> -struct work_pmf2 : public proton::void_function0 { +struct work_pmf2 : public invocable_cloner<work_pmf2<R,T,A,B> > { T& holder_; R (T::* fn_)(A, B); A a_; @@ -278,12 +194,11 @@ struct work_pmf2 : public proton::void_function0 { void operator()() { (holder_.*fn_)(a_, b_); - delete this; } }; template <class R, class T, class A, class B, class C> -struct work_pmf3 : public proton::void_function0 { +struct work_pmf3 : public invocable_cloner<work_pmf3<R,T,A,B,C> > { T& holder_; R (T::* fn_)(A, B, C); A a_; @@ -295,136 +210,189 @@ struct work_pmf3 : public proton::void_function0 { void operator()() { (holder_.*fn_)(a_, b_, c_); - delete this; } }; +/// @endcond /// make_work is the equivalent of C++11 std::bind for C++03 /// It will bind both free functions and pointers to member functions template <class R, class T> -void_function0& make_work(R (T::*f)(), T* t) { - return *new work_pmf0<R, T>(f, *t); +work make_work(R (T::*f)(), T* t) { + return work_pmf0<R, T>(f, *t); } template <class R, class T, class A> -void_function0& make_work(R (T::*f)(A), T* t, A a) { - return *new work_pmf1<R, T, A>(f, *t, a); +work make_work(R (T::*f)(A), T* t, A a) { + return work_pmf1<R, T, A>(f, *t, a); } template <class R, class T, class A, class B> -void_function0& make_work(R (T::*f)(A, B), T* t, A a, B b) { - return *new work_pmf2<R, T, A, B>(f, *t, a, b); +work make_work(R (T::*f)(A, B), T* t, A a, B b) { + return work_pmf2<R, T, A, B>(f, *t, a, b); } template <class R, class T, class A, class B, class C> -void_function0& make_work(R (T::*f)(A, B, C), T* t, A a, B b, C c) { - return *new work_pmf3<R, T, A, B, C>(f, *t, a, b, c); +work make_work(R (T::*f)(A, B, C), T* t, A a, B b, C c) { + return work_pmf3<R, T, A, B, C>(f, *t, a, b, c); } template <class R> -void_function0& make_work(R (*f)()) { - return *new work0<R>(f); +work make_work(R (*f)()) { + return work0<R>(f); } template <class R, class A> -void_function0& make_work(R (*f)(A), A a) { - return *new work1<R, A>(f, a); +work make_work(R (*f)(A), A a) { + return work1<R, A>(f, a); } template <class R, class A, class B> -void_function0& make_work(R (*f)(A, B), A a, B b) { - return *new work2<R, A, B>(f, a, b); +work make_work(R (*f)(A, B), A a, B b) { + return work2<R, A, B>(f, a, b); } template <class R, class A, class B, class C> -void_function0& make_work(R (*f)(A, B, C), A a, B b, C c) { - return *new work3<R, A, B, C>(f, a, b, c); +work make_work(R (*f)(A, B, C), A a, B b, C c) { + return work3<R, A, B, C>(f, a, b, c); } -namespace { -template <class T> -bool schedule_work_helper(T t, void_function0& w) { - bool r = t->add(w); - if (!r) delete &w; - return r; -} } -/// schedule_work is a convenience that is used for C++03 code to defer function calls -/// to a work_queue -template <class WQ, class F> -bool schedule_work(WQ wq, F f) { - return schedule_work_helper(wq, make_work(f)); -} +#if PN_CPP_HAS_LAMBDAS && PN_CPP_HAS_VARIADIC_TEMPLATES +namespace v11 { +class work { + public: + /// **Unsettled API** + work() {} -template <class WQ, class F, class A> -bool schedule_work(WQ wq, F f, A a) { - return schedule_work_helper(wq, make_work(f, a)); -} + /// **Unsettled API** + /// Construct a unit of work from anything + /// function-like that takes no arguments and returns + /// no result. + /// + template <class T, + // Make sure we don't match the copy or move constructors + class = typename std::enable_if<!std::is_same<typename std::decay<T>::type,work>::value>::type + > + work(T&& f): item_(std::forward<T>(f)) {} -template <class WQ, class F, class A, class B> -bool schedule_work(WQ wq, F f, A a, B b) { - return schedule_work_helper(wq, make_work(f, a, b)); -} + /// **Unsettled API** + /// Execute the piece of work + void operator()() { item_(); } -template <class WQ, class F, class A, class B, class C> -bool schedule_work(WQ wq, F f, A a, B b, C c) { - return schedule_work_helper(wq, make_work(f, a, b, c)); -} + ~work() {} -template <class WQ, class F, class A, class B, class C, class D> -bool schedule_work(WQ wq, F f, A a, B b, C c, D d) { - return schedule_work_helper(wq, make_work(f, a, b, c, d)); -} + private: + std::function<void()> item_; +}; -template <class WQ, class F> -void schedule_work(WQ wq, duration dn, F f) { - wq->schedule(dn, make_work(f)); +/// **Unsettled API** +/// Make a unit of work from either a function or a member function +/// and an object pointer. +/// +/// This C++11 version is just a wrapper for std::bind +template <class... Rest> +work make_work(Rest&&... r) { + return std::bind(std::forward<Rest>(r)...); } -template <class WQ, class F, class A> -void schedule_work(WQ wq, duration dn, F f, A a) { - wq->schedule(dn, make_work(f, a)); } -template <class WQ, class F, class A, class B> -void schedule_work(WQ wq, duration dn, F f, A a, B b) { - wq->schedule(dn, make_work(f, a, b)); -} +using v11::work; +using v11::make_work; -template <class WQ, class F, class A, class B, class C> -void schedule_work(WQ wq, duration dn, F f, A a, B b, C c) { - wq->schedule(dn, make_work(f, a, b, c)); -} +#else -template <class WQ, class F, class A, class B, class C, class D> -void schedule_work(WQ wq, duration dn, F f, A a, B b, C c, D d) { - wq->schedule(dn, make_work(f, a, b, c, d)); -} +using v03::work; +using v03::make_work; -#else -// The C++11 version is *much* simpler and even so more general! -// These definitions encompass everything in the C++03 section +#endif -/// **Unsettled API** -template <class WQ, class... Rest> -bool schedule_work(WQ wq, Rest&&... r) { - return wq->add(std::bind(std::forward<Rest>(r)...)); -} +/// **Unsettled API** - A context for thread-safe execution of work. +/// +/// Event-handler functions associated with a single +/// `proton::connection` are called in sequence. The connection's +/// `proton::work_queue` allows you to "inject" extra work from +/// any thread and have it executed in the same sequence. +/// +/// You may also create arbitrary `proton::work_queue` objects backed +/// by a @ref container that allow other objects to have their own +/// serialised work queues that can have work injected safely from +/// other threads. The @ref container ensures that the work is +/// correctly serialised. +/// +/// The `proton::work` class represents the work to be queued and can +/// be created from a function that takes no parameters and returns no +/// value. +class PN_CPP_CLASS_EXTERN work_queue { + /// @cond INTERNAL + class impl; + work_queue& operator=(impl* i); + /// @endcond -/// **Unsettled API** -template <class WQ, class... Rest> -void schedule_work(WQ wq, duration d, Rest&&... r) { - wq->schedule(d, std::bind(std::forward<Rest>(r)...)); -} + public: + /// **Unsettled API** - Create a work queue. + PN_CPP_EXTERN work_queue(); -/// **Unsettled API** -template <class... Rest> -work make_work(Rest&&... r) { - return std::bind(std::forward<Rest>(r)...); -} + /// **Unsettled API** - Create a work queue backed by a container. + PN_CPP_EXTERN work_queue(container&); + + PN_CPP_EXTERN ~work_queue(); + /// **Unsettled API** - Add work `fn` to the work queue. + + /// Work `fn` will be called serially with other work in the queue. + /// The work may be deferred and executed in another thread. + /// + /// @return true if `fn` has been or will be called; false if the + /// event loops is ended or `fn` cannot be injected for any other + /// reason. + PN_CPP_EXTERN bool add(work fn); + + /// @deprecated + PN_CPP_EXTERN bool add(void_function0& fn); + + /// @cond INTERNAL + /// This is a hack to ensure that the C++03 version is declared + /// only during the compilation of the library +#if PN_CPP_HAS_LAMBDAS && PN_CPP_HAS_VARIADIC_TEMPLATES && defined(qpid_proton_cpp_EXPORTS) + PN_CPP_EXTERN bool add(v03::work fn); #endif + /// @endcond + + /// **Unsettled API** - Add work `fn` to the work queue after a + /// duration. + /// + /// Scheduled execution is "best effort". It may not be possible + /// to inject the work after the elapsed duration. There will be + /// no indication of this. + /// + /// @copydetails add() + PN_CPP_EXTERN void schedule(duration, work fn); + + /// @deprecated + PN_CPP_EXTERN void schedule(duration, void_function0& fn); + + /// @cond INTERNAL + /// This is a hack to ensure that the C++03 version is declared + /// only during the compilation of the library +#if PN_CPP_HAS_LAMBDAS && PN_CPP_HAS_VARIADIC_TEMPLATES && defined(qpid_proton_cpp_EXPORTS) + PN_CPP_EXTERN void schedule(duration, v03::work fn); +#endif + /// @endcond + + private: + PN_CPP_EXTERN static work_queue& get(pn_connection_t*); + PN_CPP_EXTERN static work_queue& get(pn_session_t*); + PN_CPP_EXTERN static work_queue& get(pn_link_t*); + + internal::pn_unique_ptr<impl> impl_; + + /// @cond INTERNAL + friend class container; + friend class io::connection_driver; + /// @endcond +}; } // proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a7c2315/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp index 659ba01..c67f338 100644 --- a/proton-c/bindings/cpp/src/container.cpp +++ b/proton-c/bindings/cpp/src/container.cpp @@ -116,6 +116,8 @@ void container::schedule(duration d, v03::work f) { return impl_->schedule(d, f) void container::schedule(duration d, v11::work f) { return impl_->schedule(d, f); } #endif +void container::schedule(duration d, void_function0& f) { return impl_->schedule(d, make_work(&void_function0::operator(), &f)); } + void container::client_connection_options(const connection_options& c) { impl_->client_connection_options(c); } connection_options container::client_connection_options() const { return impl_->client_connection_options(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a7c2315/proton-c/bindings/cpp/src/container_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp index 3415d9d..c2b1609 100644 --- a/proton-c/bindings/cpp/src/container_test.cpp +++ b/proton-c/bindings/cpp/src/container_test.cpp @@ -221,7 +221,7 @@ struct hang_tester : public proton::messaging_handler { void on_container_start(proton::container& c) PN_CPP_OVERRIDE { port = listen_on_random_port(c, listener); - schedule_work(&c, proton::duration(250), &hang_tester::connect, this, &c); + c.schedule(proton::duration(250), make_work(&hang_tester::connect, this, &c)); } void on_connection_open(proton::connection& c) PN_CPP_OVERRIDE { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a7c2315/proton-c/bindings/cpp/src/work_queue.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/work_queue.cpp b/proton-c/bindings/cpp/src/work_queue.cpp index 7ec072e..f689214 100644 --- a/proton-c/bindings/cpp/src/work_queue.cpp +++ b/proton-c/bindings/cpp/src/work_queue.cpp @@ -51,6 +51,10 @@ bool work_queue::add(v11::work f) { } #endif +bool work_queue::add(void_function0& f) { + return add(make_work(&void_function0::operator(), &f)); +} + void work_queue::schedule(duration d, v03::work f) { // If we have no actual work queue, then can't defer if (!impl_) return; @@ -65,6 +69,10 @@ void work_queue::schedule(duration d, v11::work f) { } #endif +void work_queue::schedule(duration d, void_function0& f) { + schedule(d, make_work(&void_function0::operator(), &f)); +} + work_queue& work_queue::get(pn_connection_t* c) { return connection_context::get(c).work_queue_; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org