NO-JIRA: [cpp] fix example race condition, causing occasional hang
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1c44c431 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1c44c431 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1c44c431 Branch: refs/heads/go1 Commit: 1c44c431ef2acc1b37e88a0891c8e8c03bd30eb5 Parents: 5960f15 Author: Alan Conway <acon...@redhat.com> Authored: Fri Sep 28 11:11:41 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Fri Sep 28 11:12:42 2018 -0400 ---------------------------------------------------------------------- .../multithreaded_client_flow_control.cpp | 48 ++++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c44c431/cpp/examples/multithreaded_client_flow_control.cpp ---------------------------------------------------------------------- diff --git a/cpp/examples/multithreaded_client_flow_control.cpp b/cpp/examples/multithreaded_client_flow_control.cpp index 93c6b3d..73ea6f6 100644 --- a/cpp/examples/multithreaded_client_flow_control.cpp +++ b/cpp/examples/multithreaded_client_flow_control.cpp @@ -61,6 +61,12 @@ std::mutex out_lock; #define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false) +// Exception raised if a sender or receiver is closed when trying to send/receive +class closed : public std::runtime_error { + public: + closed(const std::string& msg) : std::runtime_error(msg) {} +}; + // A thread-safe sending connection that blocks sending threads when there // is no AMQP credit to send messages. class sender : private proton::messaging_handler { @@ -151,12 +157,13 @@ class receiver : private proton::messaging_handler { proton::work_queue* work_queue_; std::queue<proton::message> buffer_; // Messages not yet returned by receive() std::condition_variable can_receive_; // Notify receivers of messages + bool closed_; public: // Connect to url receiver(proton::container& cont, const std::string& url, const std::string& address) - : work_queue_() + : work_queue_(0), closed_(false) { // NOTE:credit_window(0) disables automatic flow control. // We will use flow control to match AMQP credit to buffer capacity. @@ -168,8 +175,10 @@ class receiver : private proton::messaging_handler { proton::message receive() { std::unique_lock<std::mutex> l(lock_); // Wait for buffered messages - while (!work_queue_ || buffer_.empty()) + while (!closed_ && (!work_queue_ || buffer_.empty())) { can_receive_.wait(l); + } + if (closed_) throw closed("receiver closed"); proton::message m = std::move(buffer_.front()); buffer_.pop(); // Add a lambda to the work queue to call receive_done(). @@ -178,9 +187,16 @@ class receiver : private proton::messaging_handler { return m; } + // Thread safe void close() { std::lock_guard<std::mutex> l(lock_); - if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); }); + if (!closed_) { + closed_ = true; + can_receive_.notify_all(); + if (work_queue_) { + work_queue_->add([this]() { this->receiver_.connection().close(); }); + } + } } private: @@ -229,24 +245,26 @@ void send_thread(sender& s, int n) { // Receive messages till atomic remaining count is 0. // remaining is shared among all receiving threads void receive_thread(receiver& r, std::atomic_int& remaining) { - auto id = std::this_thread::get_id(); - int n = 0; - // atomically check and decrement remaining *before* receiving. - // If it is 0 or less then return, as there are no more - // messages to receive so calling r.receive() would block forever. - while (remaining-- > 0) { - auto m = r.receive(); - ++n; - OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl); - } - OUT(std::cout << id << " received " << n << " messages" << std::endl); + try { + auto id = std::this_thread::get_id(); + int n = 0; + // atomically check and decrement remaining *before* receiving. + // If it is 0 or less then return, as there are no more + // messages to receive so calling r.receive() would block forever. + while (remaining-- > 0) { + auto m = r.receive(); + ++n; + OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl); + } + OUT(std::cout << id << " received " << n << " messages" << std::endl); + } catch (const closed&) {} } int main(int argc, const char **argv) { try { if (argc != 5) { std::cerr << - "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n" + "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT THREAD-COUNT\n" "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n" "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n" "MESSAGE-COUNT: number of messages to send\n" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org