Repository: qpid-proton Updated Branches: refs/heads/master 9e2649ac4 -> f2ae27d7e
PROTON-1138: handler options become link or connection options (https://reviews.apache.org/r/44927) Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f2ae27d7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f2ae27d7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f2ae27d7 Branch: refs/heads/master Commit: f2ae27d7e7a09c016a89be59e93d5796e01cc725 Parents: 9e2649a Author: Clifford Jansen <cliffjan...@apache.org> Authored: Mon Mar 21 17:15:24 2016 -0700 Committer: Clifford Jansen <cliffjan...@apache.org> Committed: Mon Mar 21 17:15:24 2016 -0700 ---------------------------------------------------------------------- .../cpp/include/proton/connection_options.hpp | 4 +- .../bindings/cpp/include/proton/container.hpp | 1 + .../bindings/cpp/include/proton/handler.hpp | 23 +----- proton-c/bindings/cpp/include/proton/link.hpp | 1 + .../cpp/include/proton/link_options.hpp | 17 +++- .../bindings/cpp/src/connection_options.cpp | 36 ++++----- proton-c/bindings/cpp/src/container.cpp | 2 + proton-c/bindings/cpp/src/container_impl.cpp | 19 +++-- proton-c/bindings/cpp/src/container_impl.hpp | 1 + proton-c/bindings/cpp/src/contexts.cpp | 12 +++ proton-c/bindings/cpp/src/contexts.hpp | 9 +++ proton-c/bindings/cpp/src/handler.cpp | 4 +- proton-c/bindings/cpp/src/link_options.cpp | 35 +++++--- proton-c/bindings/cpp/src/messaging_adapter.cpp | 85 ++++++++------------ proton-c/bindings/cpp/src/messaging_adapter.hpp | 12 +-- proton-c/bindings/cpp/src/proton_event.hpp | 1 + tests/tools/apps/cpp/reactor_send.cpp | 5 +- 17 files changed, 134 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/connection_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp index 71e12f1..1a22b73 100644 --- a/proton-c/bindings/cpp/include/proton/connection_options.hpp +++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp @@ -76,9 +76,6 @@ class connection_options { // XXX add C++11 move operations - /// Override with options from other. - PN_CPP_EXTERN void override(const connection_options& other); - /// Set a handler for the connection. PN_CPP_EXTERN connection_options& handler(class handler *); @@ -145,6 +142,7 @@ class connection_options { static pn_connection_t *pn_connection(connection &); class ssl_client_options &ssl_client_options(); class ssl_server_options &ssl_server_options(); + PN_CPP_EXTERN void update(const connection_options& other); class impl; internal::pn_unique_ptr<impl> impl_; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index 0af1963..7e0f0ee 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -134,6 +134,7 @@ class container { friend class connector; friend class link; + friend class messaging_adapter; /// @endcond }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/handler.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp index 3bc0023..6ea11d4 100644 --- a/proton-c/bindings/cpp/include/proton/handler.hpp +++ b/proton-c/bindings/cpp/include/proton/handler.hpp @@ -41,28 +41,7 @@ class PN_CPP_CLASS_EXTERN handler { public: - /// @cond INTERNAL - /// XXX move configuration to connection or container - - /// Create a handler. - /// - /// @param prefetch set flow control to automatically pre-fetch - /// this many messages - /// - /// @param auto_accept automatically accept received messages - /// after on_message() - /// - /// @param auto_settle automatically settle on receipt of delivery - /// for sent messages - /// - /// @param peer_close_is_error treat orderly remote connection - /// close as error - PN_CPP_EXTERN handler(int prefetch=10, bool auto_accept=true, - bool auto_settle=true, - bool peer_close_is_error=false); - - /// @endcond - + PN_CPP_EXTERN handler(); PN_CPP_EXTERN virtual ~handler(); /// @name Event callbacks http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/link.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp index 8de2e7b..363ea16 100644 --- a/proton-c/bindings/cpp/include/proton/link.hpp +++ b/proton-c/bindings/cpp/include/proton/link.hpp @@ -151,6 +151,7 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint friend class proton_event; friend class link_iterator; friend class link_options; + friend class messaging_adapter; }; /// An iterator for links. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/include/proton/link_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/link_options.hpp b/proton-c/bindings/cpp/include/proton/link_options.hpp index 2eb145d..7f70c4a 100644 --- a/proton-c/bindings/cpp/include/proton/link_options.hpp +++ b/proton-c/bindings/cpp/include/proton/link_options.hpp @@ -120,9 +120,6 @@ class link_options { /// Copy options. PN_CPP_EXTERN link_options& operator=(const link_options&); - /// Override with options from other. - PN_CPP_EXTERN void override(const link_options& other); - /// Set a handler for events scoped to the link. If NULL, /// link-scoped events on the link are discarded. PN_CPP_EXTERN link_options& handler(class handler *); @@ -155,6 +152,18 @@ class link_options { /// Set the local address for the link. PN_CPP_EXTERN link_options& local_address(const std::string &addr); + /// Automatically accept inbound messages that aren't otherwise + /// released, rejected or modified (default value:true). + PN_CPP_EXTERN link_options& auto_accept(bool); + + /// Automatically settle messages (default value: true). + PN_CPP_EXTERN link_options& auto_settle(bool); + + /// Set automated flow control to pre-fetch this many messages + /// (default value:10). Set to zero to disable automatic credit + /// replenishing. + PN_CPP_EXTERN link_options& credit_window(int); + /// @cond INTERNAL /// XXX need to discuss spec issues, jms versus amqp filters /// @@ -168,11 +177,13 @@ class link_options { private: void apply(link&) const; proton_handler* handler() const; + PN_CPP_EXTERN void update(const link_options& other); class impl; internal::pn_unique_ptr<impl> impl_; friend class link; + friend class container_impl; /// @endcond }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/connection_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp index a5ee99e..c218d64 100644 --- a/proton-c/bindings/cpp/src/connection_options.cpp +++ b/proton-c/bindings/cpp/src/connection_options.cpp @@ -41,7 +41,7 @@ template <class T> struct option { option() : value(), set(false) {} option& operator=(const T& x) { value = x; set = true; return *this; } - void override(const option<T>& x) { if (x.set) *this = x.value; } + void update(const option<T>& x) { if (x.set) *this = x.value; } }; class connection_options::impl { @@ -124,22 +124,22 @@ class connection_options::impl { } } - void override(const impl& x) { - handler.override(x.handler); - max_frame_size.override(x.max_frame_size); - max_channels.override(x.max_channels); - idle_timeout.override(x.idle_timeout); - heartbeat.override(x.heartbeat); - container_id.override(x.container_id); - link_prefix.override(x.link_prefix); - reconnect.override(x.reconnect); - ssl_client_options.override(x.ssl_client_options); - ssl_server_options.override(x.ssl_server_options); - sasl_enabled.override(x.sasl_enabled); - sasl_allow_insecure_mechs.override(x.sasl_allow_insecure_mechs); - sasl_allowed_mechs.override(x.sasl_allowed_mechs); - sasl_config_name.override(x.sasl_config_name); - sasl_config_path.override(x.sasl_config_path); + void update(const impl& x) { + handler.update(x.handler); + max_frame_size.update(x.max_frame_size); + max_channels.update(x.max_channels); + idle_timeout.update(x.idle_timeout); + heartbeat.update(x.heartbeat); + container_id.update(x.container_id); + link_prefix.update(x.link_prefix); + reconnect.update(x.reconnect); + ssl_client_options.update(x.ssl_client_options); + ssl_server_options.update(x.ssl_server_options); + sasl_enabled.update(x.sasl_enabled); + sasl_allow_insecure_mechs.update(x.sasl_allow_insecure_mechs); + sasl_allowed_mechs.update(x.sasl_allowed_mechs); + sasl_config_name.update(x.sasl_config_name); + sasl_config_path.update(x.sasl_config_path); } }; @@ -155,7 +155,7 @@ connection_options& connection_options::operator=(const connection_options& x) { return *this; } -void connection_options::override(const connection_options& x) { impl_->override(*x.impl_); } +void connection_options::update(const connection_options& x) { impl_->update(*x.impl_); } connection_options& connection_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; } connection_options& connection_options::max_frame_size(uint32_t n) { impl_->max_frame_size = n; return *this; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp index fc34f39..2cc8df5 100644 --- a/proton-c/bindings/cpp/src/container.cpp +++ b/proton-c/bindings/cpp/src/container.cpp @@ -81,4 +81,6 @@ void container::client_connection_options(const connection_options &o) { impl_-> void container::server_connection_options(const connection_options &o) { impl_->server_connection_options(o); } +void container::link_options(const class link_options &o) { impl_->link_options(o); } + } // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp index 55547ca..979c20c 100644 --- a/proton-c/bindings/cpp/src/container_impl.cpp +++ b/proton-c/bindings/cpp/src/container_impl.cpp @@ -137,17 +137,16 @@ container_impl::container_impl(container& c, messaging_adapter *h, const std::st reactor_.pn_handler(cpp_handler(handler_).get()); } - // Note: we have just set up the following handlers that see events in this order: - // messaging_handler (Proton C events), pn_flowcontroller (optional), messaging_adapter, - // messaging_handler (Messaging events from the messaging_adapter, i.e. the delegate), - // connector override, the reactor's default globalhandler (pn_iohandler) + // Note: we have just set up the following handlers that see + // events in this order: messaging_adapter, connector override, + // the reactor's default globalhandler (pn_iohandler) } container_impl::~container_impl() {} connection container_impl::connect(const proton::url &url, const connection_options &user_opts) { connection_options opts = client_connection_options(); // Defaults - opts.override(user_opts); + opts.update(user_opts); proton_handler *h = opts.handler(); internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>(); @@ -165,9 +164,9 @@ connection container_impl::connect(const proton::url &url, const connection_opti sender container_impl::open_sender(const proton::url &url, const proton::link_options &o1, const connection_options &o2) { proton::link_options lopts(link_options_); - lopts.override(o1); + lopts.update(o1); connection_options copts(client_connection_options_); - copts.override(o2); + copts.update(o2); connection conn = connect(url, copts); std::string path = url.path(); sender snd = conn.default_session().create_sender(); @@ -178,9 +177,9 @@ sender container_impl::open_sender(const proton::url &url, const proton::link_op receiver container_impl::open_receiver(const proton::url &url, const proton::link_options &o1, const connection_options &o2) { proton::link_options lopts(link_options_); - lopts.override(o1); + lopts.update(o1); connection_options copts(client_connection_options_); - copts.override(o2); + copts.update(o2); connection conn = connect(url, copts); std::string path = url.path(); receiver rcv = conn.default_session().create_receiver(); @@ -191,7 +190,7 @@ receiver container_impl::open_receiver(const proton::url &url, const proton::lin acceptor container_impl::listen(const proton::url& url, const connection_options &user_opts) { connection_options opts = server_connection_options(); // Defaults - opts.override(user_opts); + opts.update(user_opts); proton_handler *h = opts.handler(); internal::pn_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : internal::pn_ptr<pn_handler_t>(); pn_acceptor_t *acptr = pn_reactor_acceptor(reactor_.pn_object(), url.host().c_str(), url.port().c_str(), chandler.get()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp index d1b476f..05e2058 100644 --- a/proton-c/bindings/cpp/src/container_impl.hpp +++ b/proton-c/bindings/cpp/src/container_impl.hpp @@ -83,6 +83,7 @@ class container_impl proton::link_options link_options_; friend class container; + friend class messaging_adapter; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/contexts.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp index c2b76f6..99b05a1 100644 --- a/proton-c/bindings/cpp/src/contexts.cpp +++ b/proton-c/bindings/cpp/src/contexts.cpp @@ -50,6 +50,7 @@ pn_class_t cpp_context_class = PN_CLASS(cpp_context); PN_HANDLE(CONNECTION_CONTEXT) PN_HANDLE(CONTAINER_CONTEXT) PN_HANDLE(LISTENER_CONTEXT) +PN_HANDLE(LINK_CONTEXT) void set_context(pn_record_t* record, pn_handle_t handle, const pn_class_t *clazz, void* value) { @@ -99,4 +100,15 @@ listener_context& listener_context::get(pn_acceptor_t* a) { return *ctx; } +link_context& link_context::get(pn_link_t* l) { + link_context* ctx = + get_context<link_context>(pn_link_attachments(l), LINK_CONTEXT); + if (!ctx) { + ctx = context::create<link_context>(); + set_context(pn_link_attachments(l), LINK_CONTEXT, context::pn_class(), ctx); + pn_decref(ctx); + } + return *ctx; +} + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp index b4fcdba..03271a1 100644 --- a/proton-c/bindings/cpp/src/contexts.hpp +++ b/proton-c/bindings/cpp/src/contexts.hpp @@ -127,6 +127,15 @@ class listener_context : public context { bool ssl; }; +class link_context : public context { + public: + static link_context& get(pn_link_t* l); + link_context() : credit_window(10), auto_accept(true), auto_settle(true) {} + int credit_window; + bool auto_accept; + bool auto_settle; +}; + } #endif /*!PROTON_CPP_CONTEXTS_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp index 43febf5..1119720 100644 --- a/proton-c/bindings/cpp/src/handler.cpp +++ b/proton-c/bindings/cpp/src/handler.cpp @@ -32,9 +32,7 @@ namespace proton { -handler::handler(int prefetch0, bool auto_accept0, bool auto_settle0, bool peer_close_is_error0) : - messaging_adapter_(new messaging_adapter(*this, prefetch0, auto_accept0, auto_settle0, peer_close_is_error0)) -{} +handler::handler() : messaging_adapter_(new messaging_adapter(*this)) {} handler::~handler(){} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/link_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/link_options.cpp b/proton-c/bindings/cpp/src/link_options.cpp index 534a6b6..ee41535 100644 --- a/proton-c/bindings/cpp/src/link_options.cpp +++ b/proton-c/bindings/cpp/src/link_options.cpp @@ -26,6 +26,7 @@ #include "msg.hpp" #include "messaging_adapter.hpp" +#include "contexts.hpp" namespace proton { @@ -58,7 +59,7 @@ template <class T> struct option { option() : value(), set(false) {} option& operator=(const T& x) { value = x; set = true; return *this; } - void override(const option<T>& x) { if (x.set) *this = x.value; } + void update(const option<T>& x) { if (x.set) *this = x.value; } }; class link_options::impl { @@ -71,6 +72,9 @@ class link_options::impl { option<std::string> local_address; option<enum lifetime_policy> lifetime_policy; option<std::string> selector; + option<bool> auto_accept; + option<bool> auto_settle; + option<int> credit_window; void apply(link& l) { if (l.state() & endpoint::LOCAL_UNINIT) { @@ -118,6 +122,7 @@ class link_options::impl { } } } + if (auto_settle.set) link_context::get(l.pn_object()).auto_settle = auto_settle.value; if (!sender) { // receiver only options if (distribution_mode.set) l.local_source().distribution_mode(distribution_mode.value); @@ -130,19 +135,24 @@ class link_options::impl { enc << codec::start::map() << symbol("selector") << codec::start::described() << symbol("apache.org:selector-filter:string") << binary(selector.value) << codec::finish(); } + if (auto_accept.set) link_context::get(l.pn_object()).auto_accept = auto_accept.value; + if (credit_window.set) link_context::get(l.pn_object()).credit_window = credit_window.value; } } } - void override(const impl& x) { - handler.override(x.handler); - distribution_mode.override(x.distribution_mode); - durable_subscription.override(x.durable_subscription); - delivery_mode.override(x.delivery_mode); - dynamic_address.override(x.dynamic_address); - local_address.override(x.local_address); - lifetime_policy.override(x.lifetime_policy); - selector.override(x.selector); + void update(const impl& x) { + handler.update(x.handler); + distribution_mode.update(x.distribution_mode); + durable_subscription.update(x.durable_subscription); + delivery_mode.update(x.delivery_mode); + dynamic_address.update(x.dynamic_address); + local_address.update(x.local_address); + lifetime_policy.update(x.lifetime_policy); + selector.update(x.selector); + auto_accept.update(x.auto_accept); + auto_settle.update(x.auto_settle); + credit_window.update(x.credit_window); } }; @@ -158,7 +168,7 @@ link_options& link_options::operator=(const link_options& x) { return *this; } -void link_options::override(const link_options& x) { impl_->override(*x.impl_); } +void link_options::update(const link_options& x) { impl_->update(*x.impl_); } link_options& link_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; } link_options& link_options::browsing(bool b) { distribution_mode(b ? terminus::COPY : terminus::MOVE); return *this; } @@ -169,6 +179,9 @@ link_options& link_options::dynamic_address(bool b) {impl_->dynamic_address = b; link_options& link_options::local_address(const std::string &addr) {impl_->local_address = addr; return *this; } link_options& link_options::lifetime_policy(enum lifetime_policy lp) {impl_->lifetime_policy = lp; return *this; } link_options& link_options::selector(const std::string &str) {impl_->selector = str; return *this; } +link_options& link_options::auto_accept(bool b) {impl_->auto_accept = b; return *this; } +link_options& link_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; } +link_options& link_options::credit_window(int w) {impl_->credit_window = w; return *this; } void link_options::apply(link& l) const { impl_->apply(l); } proton_handler* link_options::handler() const { return impl_->handler.value; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp index 4265644..4bc21af 100644 --- a/proton-c/bindings/cpp/src/messaging_adapter.cpp +++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp @@ -26,6 +26,7 @@ #include "contexts.hpp" #include "messaging_event.hpp" +#include "container_impl.hpp" #include "msg.hpp" #include "proton/connection.h" @@ -38,49 +39,7 @@ namespace proton { -namespace { -class c_flow_controller : public proton_handler -{ - public: - pn_handler_t *flowcontroller; - - // TODO: pn_flowcontroller requires a window > 1. - c_flow_controller(int window) : flowcontroller(pn_flowcontroller(std::max(window, 2))) {} - ~c_flow_controller() { - pn_decref(flowcontroller); - } - - void redirect(proton_event &pne) { - pn_handler_dispatch(flowcontroller, pne.pn_event(), pn_event_type_t(pne.type())); - } - - virtual void on_link_local_open(proton_event &e) { redirect(e); } - virtual void on_link_remote_open(proton_event &e) { redirect(e); } - virtual void on_link_flow(proton_event &e) { redirect(e); } - virtual void on_delivery(proton_event &e) { redirect(e); } -}; - -} // namespace - -void messaging_adapter::create_helpers() { - if (prefetch_ > 0) { - flow_controller_.reset(new c_flow_controller(prefetch_)); - add_child_handler(*flow_controller_); - } -} - -messaging_adapter::messaging_adapter(handler &delegate, - int prefetch, bool auto_accept, bool auto_settle, bool peer_close_iserror) : - delegate_(delegate), - prefetch_(prefetch), - auto_accept_(auto_accept), - auto_settle_(auto_settle), - peer_close_iserror_(peer_close_iserror) -{ - create_helpers(); - //add_child_handler(*this); -} - +messaging_adapter::messaging_adapter(handler &delegate) : delegate_(delegate) {} messaging_adapter::~messaging_adapter(){} @@ -98,11 +57,13 @@ void messaging_adapter::on_link_flow(proton_event &pe) { messaging_event mevent(messaging_event::SENDABLE, pe); delegate_.on_sendable(mevent);; } + credit_topup(lnk); } void messaging_adapter::on_delivery(proton_event &pe) { pn_event_t *cevent = pe.pn_event(); pn_link_t *lnk = pn_event_link(cevent); + link_context& lctx = link_context::get(lnk); delivery dlv = pe.delivery(); if (pn_link_is_receiver(lnk)) { @@ -118,11 +79,11 @@ void messaging_adapter::on_delivery(proton_event &pe) { mevent.message_ = &msg; mevent.message_->decode(dlv); if (pn_link_state(lnk) & PN_LOCAL_CLOSED) { - if (auto_accept_) + if (lctx.auto_accept) dlv.release(); } else { delegate_.on_message(mevent); - if (auto_accept_ && !dlv.settled()) + if (lctx.auto_accept && !dlv.settled()) dlv.accept(); } } @@ -130,6 +91,7 @@ void messaging_adapter::on_delivery(proton_event &pe) { messaging_event mevent(messaging_event::DELIVERY_SETTLE, pe); delegate_.on_delivery_settle(mevent); } + credit_topup(lnk); } else { // sender if (dlv.updated()) { @@ -151,7 +113,7 @@ void messaging_adapter::on_delivery(proton_event &pe) { messaging_event mevent(messaging_event::DELIVERY_SETTLE, pe); delegate_.on_delivery_settle(mevent); } - if (auto_settle_) + if (lctx.auto_settle) dlv.settle(); } } @@ -172,7 +134,7 @@ bool is_local_unititialised(pn_state_t state) { void messaging_adapter::on_link_remote_close(proton_event &pe) { pn_event_t *cevent = pe.pn_event(); pn_link_t *lnk = pn_event_link(cevent); - if (peer_close_iserror_ || pn_condition_is_set(pn_link_remote_condition(lnk))) { + if (pn_condition_is_set(pn_link_remote_condition(lnk))) { messaging_event mevent(messaging_event::LINK_ERROR, pe); delegate_.on_link_error(mevent); } @@ -184,7 +146,7 @@ void messaging_adapter::on_link_remote_close(proton_event &pe) { void messaging_adapter::on_session_remote_close(proton_event &pe) { pn_event_t *cevent = pe.pn_event(); pn_session_t *session = pn_event_session(cevent); - if (peer_close_iserror_ || pn_condition_is_set(pn_session_remote_condition(session))) { + if (pn_condition_is_set(pn_session_remote_condition(session))) { messaging_event mevent(messaging_event::SESSION_ERROR, pe); delegate_.on_session_error(mevent); } @@ -196,7 +158,7 @@ void messaging_adapter::on_session_remote_close(proton_event &pe) { void messaging_adapter::on_connection_remote_close(proton_event &pe) { pn_event_t *cevent = pe.pn_event(); pn_connection_t *connection = pn_event_connection(cevent); - if (peer_close_iserror_ || pn_condition_is_set(pn_connection_remote_condition(connection))) { + if (pn_condition_is_set(pn_connection_remote_condition(connection))) { messaging_event mevent(messaging_event::CONNECTION_ERROR, pe); delegate_.on_connection_error(mevent); } @@ -223,13 +185,22 @@ void messaging_adapter::on_session_remote_open(proton_event &pe) { } } +void messaging_adapter::on_link_local_open(proton_event &pe) { + credit_topup(pn_event_link(pe.pn_event())); +} + void messaging_adapter::on_link_remote_open(proton_event &pe) { messaging_event mevent(messaging_event::LINK_OPEN, pe); delegate_.on_link_open(mevent); - pn_link_t *link = pn_event_link(pe.pn_event()); - if (!is_local_open(pn_link_state(link)) && is_local_unititialised(pn_link_state(link))) { - pn_link_open(link); + pn_link_t *pnlink = pn_event_link(pe.pn_event()); + if (!is_local_open(pn_link_state(pnlink)) && is_local_unititialised(pn_link_state(pnlink))) { + link lnk(pnlink); + if (pe.container_) + lnk.open(pe.container_->impl_->link_options_); + else + lnk.open(); // No default for engine } + credit_topup(pnlink); } void messaging_adapter::on_transport_tail_closed(proton_event &pe) { @@ -251,4 +222,14 @@ void messaging_adapter::on_timer_task(proton_event& pe) delegate_.on_timer(mevent); } +void messaging_adapter::credit_topup(pn_link_t *link) { + if (link && pn_link_is_receiver(link)) { + int window = link_context::get(link).credit_window; + if (window) { + int delta = window - pn_link_credit(link); + pn_link_flow(link, delta); + } + } +} + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/messaging_adapter.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.hpp b/proton-c/bindings/cpp/src/messaging_adapter.hpp index a5b364a..655efb6 100644 --- a/proton-c/bindings/cpp/src/messaging_adapter.hpp +++ b/proton-c/bindings/cpp/src/messaging_adapter.hpp @@ -38,9 +38,7 @@ namespace proton { class messaging_adapter : public proton_handler { public: - messaging_adapter(handler &delegate, - int prefetch, bool auto_accept, bool auto_settle, - bool peer_close_is_error); + messaging_adapter(handler &delegate); virtual ~messaging_adapter(); void on_reactor_init(proton_event &e); @@ -50,6 +48,7 @@ class messaging_adapter : public proton_handler void on_connection_remote_close(proton_event &e); void on_session_remote_open(proton_event &e); void on_session_remote_close(proton_event &e); + void on_link_local_open(proton_event &e); void on_link_remote_open(proton_event &e); void on_link_remote_close(proton_event &e); void on_transport_tail_closed(proton_event &e); @@ -57,12 +56,7 @@ class messaging_adapter : public proton_handler private: handler &delegate_; // The handler for generated messaging_event's - int prefetch_; - bool auto_accept_; - bool auto_settle_; - bool peer_close_iserror_; - internal::pn_unique_ptr<proton_handler> flow_controller_; - void create_helpers(); + void credit_topup(pn_link_t*); }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/proton-c/bindings/cpp/src/proton_event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_event.hpp b/proton-c/bindings/cpp/src/proton_event.hpp index 8dd2f8f..c671eb9 100644 --- a/proton-c/bindings/cpp/src/proton_event.hpp +++ b/proton-c/bindings/cpp/src/proton_event.hpp @@ -294,6 +294,7 @@ class proton_event class container *container_; friend class messaging_event; friend class connection_engine; + friend class messaging_adapter; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2ae27d7/tests/tools/apps/cpp/reactor_send.cpp ---------------------------------------------------------------------- diff --git a/tests/tools/apps/cpp/reactor_send.cpp b/tests/tools/apps/cpp/reactor_send.cpp index 224ac71..5700c73 100644 --- a/tests/tools/apps/cpp/reactor_send.cpp +++ b/tests/tools/apps/cpp/reactor_send.cpp @@ -29,6 +29,7 @@ #include "proton/event.hpp" #include "proton/reactor.h" #include "proton/value.hpp" +#include "proton/link_options.hpp" #include <iostream> #include <map> @@ -53,8 +54,7 @@ class reactor_send : public proton::handler { public: reactor_send(const std::string &url, int c, int size, bool replying) - : handler(1024), // prefetch=1024 - url_(url), sent_(0), confirmed_(0), total_(c), + : url_(url), sent_(0), confirmed_(0), total_(c), received_(0), received_bytes_(0), replying_(replying) { if (replying_) message_.reply_to("localhost/test"); @@ -64,6 +64,7 @@ class reactor_send : public proton::handler { } void on_start(proton::event &e) { + e.container().link_options(proton::link_options().credit_window(1024)); e.container().open_sender(url_); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org