Repository: qpid-proton Updated Branches: refs/heads/master a8f0c956a -> e631bf6b1
PROTON-1567: Implement failover urls - Example "reliable" client sending and receiving messages - Also add jitter to retry backoff (with C++11) Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e631bf6b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e631bf6b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e631bf6b Branch: refs/heads/master Commit: e631bf6b11960d9687d42dfdde1ff4c65804981c Parents: a8f0c95 Author: Andrew Stitcher <astitc...@apache.org> Authored: Thu Aug 31 17:31:17 2017 -0400 Committer: Andrew Stitcher <astitc...@apache.org> Committed: Fri Sep 1 16:39:17 2017 -0400 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 1 + examples/cpp/reconnect_client.cpp | 144 +++++++++++++++++++ .../cpp/include/proton/connection_options.hpp | 1 - .../cpp/include/proton/internal/config.hpp | 8 ++ .../cpp/include/proton/reconnect_options.hpp | 5 +- proton-c/bindings/cpp/src/contexts.cpp | 3 +- proton-c/bindings/cpp/src/include/contexts.hpp | 1 + .../cpp/src/include/reconnect_options_impl.hpp | 4 + .../cpp/src/proactor_container_impl.cpp | 59 ++++++-- proton-c/bindings/cpp/src/reconnect_options.cpp | 1 + 10 files changed, 213 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index d116913..a8d9d34 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -41,6 +41,7 @@ foreach(example helloworld_direct simple_recv simple_send + reconnect_client message_properties scheduled_send_03 direct_recv http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/examples/cpp/reconnect_client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/reconnect_client.cpp b/examples/cpp/reconnect_client.cpp new file mode 100644 index 0000000..6075f03 --- /dev/null +++ b/examples/cpp/reconnect_client.cpp @@ -0,0 +1,144 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/default_container.hpp> +#include <proton/delivery.hpp> +#include <proton/link.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/reconnect_options.hpp> +#include <proton/value.hpp> +#include <proton/types.hpp> + +#include <iostream> +#include <map> +#include <string> + +#include "fake_cpp11.hpp" + +class reconnect_client : public proton::messaging_handler { + std::string url; + std::string address; + std::vector<std::string> failovers; + proton::sender sender; + int sent; + int expected; + int received; + + public: + reconnect_client(const std::string &u, const std::string& a, int c, const std::vector<std::string>& f) : + url(u), address(a), failovers(f), sent(0), expected(c), received(0) {} + + private: + void on_container_start(proton::container &c) OVERRIDE { + proton::connection_options co; + proton::reconnect_options ro; + + ro.failover_urls(failovers); + co.reconnect(ro); + c.connect(url, co); + } + + void on_connection_open(proton::connection & c) OVERRIDE { + c.open_receiver(address); + c.open_sender(address); + // reconnect we probably lost the last message sent + sent = received; + std::cout << "simple_recv listening on " << url << std::endl; + } + + void on_message(proton::delivery &d, proton::message &msg) OVERRIDE { + if (proton::coerce<int>(msg.id()) < received) { + return; // Ignore duplicate + } + + if (expected == 0 || received < expected) { + std::cout << msg.body() << std::endl; + received++; + + if (received == expected) { + d.receiver().close(); + sender.close(); + d.connection().close(); + } else { + // See if we can send any messages now + send(sender); + } + } + } + + void send(proton::sender& s) { + // Only send with credit and only allow one outstanding message + while (s.credit() && sent < received+1) { + std::map<std::string, int> m; + m["sequence"] = sent + 1; + + proton::message msg; + msg.id(sent + 1); + msg.body(m); + + std::cout << "Sending: " << sent+1 << std::endl; + s.send(msg); + sent++; + } + } + + void on_sender_open(proton::sender & s) OVERRIDE { + sender = s; + } + + void on_sendable(proton::sender &s) OVERRIDE { + send(s); + } +}; + +int main(int argc, const char** argv) { + try { + if (argc < 4) { + std ::cerr << + "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT FAILOVER-URL...\n" + "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n" + "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n" + "MESSAGE-COUNT: number of messages to receive\n" + "FAILOVER_URL...: zero or more failover urls\n"; + return 1; + } + const char *url = argv[1]; + const char *address = argv[2]; + int message_count = atoi(argv[3]); + std::vector<std::string> failovers(&argv[4], &argv[argc]); + + reconnect_client client(url, address, message_count, failovers); + proton::default_container(client).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/connection_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp index 066e8cf..02a9027 100644 --- a/proton-c/bindings/cpp/include/proton/connection_options.hpp +++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp @@ -149,7 +149,6 @@ class connection_options { /// **Experimental** - Options for reconnect on outgoing connections. PN_CPP_EXTERN connection_options& reconnect(reconnect_options &); - /// Update option values from values set in other. PN_CPP_EXTERN connection_options& update(const connection_options& other); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/internal/config.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp index 54b014b..e600f21 100644 --- a/proton-c/bindings/cpp/include/proton/internal/config.hpp +++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp @@ -83,6 +83,10 @@ #define PN_CPP_HAS_DELETED_FUNCTIONS PN_CPP_HAS_CPP11 #endif +#ifndef PN_CPP_HAS_THREAD_LOCAL +#define PN_CPP_HAS_THREAD_LOCAL PN_CPP_HAS_CPP11 +#endif + #ifndef PN_CPP_HAS_STD_FUNCTION #define PN_CPP_HAS_STD_FUNCTION PN_CPP_HAS_CPP11 #endif @@ -95,6 +99,10 @@ #define PN_CPP_HAS_CHRONO PN_CPP_HAS_CPP11 #endif +#ifndef PN_CPP_HAS_RANDOM +#define PN_CPP_HAS_RANDOM PN_CPP_HAS_CPP11 +#endif + #ifndef PN_CPP_HAS_STD_MUTEX #define PN_CPP_HAS_STD_MUTEX PN_CPP_HAS_CPP11 #endif http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/reconnect_options.hpp b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp index e8ed02c..abbd517 100644 --- a/proton-c/bindings/cpp/include/proton/reconnect_options.hpp +++ b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp @@ -28,6 +28,7 @@ #include "./source.hpp" #include <string> +#include <vector> namespace proton { @@ -67,8 +68,8 @@ class reconnect_options { /// Maximum reconnect attempts (default 0, meaning no limit) PN_CPP_EXTERN reconnect_options& max_attempts(int); - /// TODO: failover_urls - + /// Alternative connection urls used for failover + PN_CPP_EXTERN reconnect_options& failover_urls(const std::vector<std::string>& urls); private: class impl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/contexts.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp index 812d573..d076478 100644 --- a/proton-c/bindings/cpp/src/contexts.cpp +++ b/proton-c/bindings/cpp/src/contexts.cpp @@ -72,7 +72,8 @@ connection_context::connection_context() : {} reconnect_context::reconnect_context(const reconnect_options& ro, const connection_options& co) : - reconnect_options_(new reconnect_options(ro)), connection_options_(new connection_options(co)), retries_(0) + reconnect_options_(new reconnect_options(ro)), connection_options_(new connection_options(co)), + retries_(0), current_url_(-1) {} listener_context::listener_context() : listen_handler_(0) {} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/include/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp index 7920d70..7797b08 100644 --- a/proton-c/bindings/cpp/src/include/contexts.hpp +++ b/proton-c/bindings/cpp/src/include/contexts.hpp @@ -106,6 +106,7 @@ class reconnect_context { internal::pn_unique_ptr<const connection_options> connection_options_; duration delay_; int retries_; + int current_url_; }; class listener_context : public context { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp index fc90508..2aca82d 100644 --- a/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp +++ b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp @@ -24,6 +24,9 @@ #include "proton/duration.hpp" +#include <string> +#include <vector> + namespace proton { class reconnect_options::impl { @@ -34,6 +37,7 @@ class reconnect_options::impl { float delay_multiplier; duration max_delay; int max_attempts; + std::vector<std::string> failover_urls; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp index 4f90c68..b818f65 100644 --- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -46,6 +46,13 @@ # include <thread> #endif +#if PN_CPP_HAS_RANDOM +# include <random> +#endif + +// XXXX: Debug +//#include <iostream> + namespace proton { class container::impl::common_work_queue : public work_queue::impl { @@ -179,8 +186,8 @@ pn_connection_t* container::impl::make_connection_lh( cc.container = &container_; cc.handler = mh; cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc); - cc.connected_address_ = url; + setup_connection_lh(url, pnc); make_wrapper(pnc).open(opts); @@ -205,38 +212,70 @@ void container::impl::start_connection(const url& url, pn_connection_t *pnc) { void container::impl::reconnect(pn_connection_t* pnc) { connection_context& cc = connection_context::get(pnc); - reconnect_context* rc = cc.reconnect_context_.get(); + reconnect_context& rc = *cc.reconnect_context_.get(); + const reconnect_options::impl& roi = *rc.reconnect_options_->impl_; // Figure out next connection url to try - const proton::url url(cc.connected_address_); + // rc.current_url_ == -1 means try the url specified in connect, not a failover url + const proton::url url(rc.current_url_==-1 ? cc.connected_address_ : roi.failover_urls[rc.current_url_]); + + // XXXX Debug: + //std::cout << "Retries: " << rc.retries_ << " Delay: " << rc.delay_ << " Trying: " << url << "\n"; - cc.connected_address_ = url; setup_connection_lh(url, pnc); - { // Scope required to keep temporary destructor from doing pn_decref() after start_connection() - make_wrapper(pnc).open(*rc->connection_options_); + make_wrapper(pnc).open(*rc.connection_options_); + start_connection(url, pnc); + + // Did we go through all the urls? + if (rc.current_url_==int(roi.failover_urls.size())-1) { + rc.current_url_ = -1; + ++rc.retries_; + } else { + ++rc.current_url_; } - start_connection(cc.connected_address_, pnc); - rc->retries_++; +} + +namespace { +#if PN_CPP_HAS_RANDOM && PN_CPP_HAS_THREAD_LOCAL +duration random_between(duration min, duration max) +{ + static thread_local std::default_random_engine gen; + std::uniform_int_distribution<duration::numeric_type> dist{min.milliseconds(), max.milliseconds()}; + return duration(dist(gen)); +} +#else +duration random_between(duration, duration max) +{ + return max; +} +#endif } duration container::impl::next_delay(reconnect_context& rc) { // If we've not retried before do it immediately if (rc.retries_==0) return duration(0); + // If we haven't tried all failover urls yet this round do it immediately + if (rc.current_url_!=-1) return duration(0); + const reconnect_options::impl& roi = *rc.reconnect_options_->impl_; if (rc.retries_==1) { rc.delay_ = roi.delay; } else { rc.delay_ = std::min(roi.max_delay, rc.delay_ * roi.delay_multiplier); } - return rc.delay_; + return random_between(roi.delay, rc.delay_); } void container::impl::reset_reconnect(pn_connection_t* pnc) { connection_context& cc = connection_context::get(pnc); reconnect_context* rc = cc.reconnect_context_.get(); - if (rc) rc->retries_ = 0; + if (!rc) return; + + rc->delay_ = 0; + rc->retries_ = 0; + rc->current_url_ = -1; } bool container::impl::setup_reconnect(pn_connection_t* pnc) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e631bf6b/proton-c/bindings/cpp/src/reconnect_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/reconnect_options.cpp b/proton-c/bindings/cpp/src/reconnect_options.cpp index ef0d497..a469d3f 100644 --- a/proton-c/bindings/cpp/src/reconnect_options.cpp +++ b/proton-c/bindings/cpp/src/reconnect_options.cpp @@ -39,5 +39,6 @@ reconnect_options& reconnect_options::delay(duration d) { impl_->delay = d; retu reconnect_options& reconnect_options::delay_multiplier(float f) { impl_->delay_multiplier = f; return *this; } reconnect_options& reconnect_options::max_delay(duration d) { impl_->max_delay = d; return *this; } reconnect_options& reconnect_options::max_attempts(int i) { impl_->max_attempts = i; return *this; } +reconnect_options& reconnect_options::failover_urls(const std::vector<std::string>& urls) { impl_->failover_urls = urls; return *this; } } // namespace proton --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org