Repository: qpid-cpp Updated Branches: refs/heads/master e7400a0e0 -> 74902a52d
QPID-7415: introduce special handling for different delivery statuses Introduces two new connection options (AMQP 1.0 only): * max_delivery_count determines how many times we try to resend a 'released' message. A value of 0, which is the default, retries indefinitely. * raise_rejected determines whether an MessageRejected exception is raised when a message is 'rejected', the default is true A message is considered 'released' if the outcome is relased, or if the outcome as modified and the 'undeliverable-here' flag is not set. A message is considered 'rejected' if the outcome is rejected, if the outcome is modified and the 'undeliverable-here' flag is set, or if it was 'released' but we have reached the maximum number of delivery attempts. Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/74902a52 Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/74902a52 Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/74902a52 Branch: refs/heads/master Commit: 74902a52d93215705d0538067a07184c99b70206 Parents: e7400a0 Author: Gordon Sim <g...@redhat.com> Authored: Mon Sep 5 20:50:22 2016 +0100 Committer: Gordon Sim <g...@redhat.com> Committed: Wed Sep 28 20:12:40 2016 +0100 ---------------------------------------------------------------------- src/qpid/messaging/ConnectionOptions.cpp | 9 +- src/qpid/messaging/ConnectionOptions.h | 3 + src/qpid/messaging/amqp/ConnectionContext.cpp | 14 +- src/qpid/messaging/amqp/EncodedMessage.cpp | 1 + src/qpid/messaging/amqp/SenderContext.cpp | 156 ++++++++++++++++++--- src/qpid/messaging/amqp/SenderContext.h | 35 ++++- src/qpid/messaging/amqp/SessionContext.cpp | 8 +- src/qpid/messaging/amqp/SessionContext.h | 3 +- src/qpid/messaging/amqp/Transaction.cpp | 2 +- 9 files changed, 194 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/ConnectionOptions.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/ConnectionOptions.cpp b/src/qpid/messaging/ConnectionOptions.cpp index 3095169..24ea1f8 100644 --- a/src/qpid/messaging/ConnectionOptions.cpp +++ b/src/qpid/messaging/ConnectionOptions.cpp @@ -52,7 +52,8 @@ void merge(const qpid::types::Variant::List& from, std::vector<std::string>& to) ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options) : replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2), - retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false) + retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false), + maxDeliveryAttempts(0), raiseRejected(true), redeliveryTimeout(0) { // By default we want the sasl service name to be "amqp" for 1.0 // this will be overridden by a parsed "sasl-service" option @@ -127,6 +128,12 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant& setToOnSend = value; } else if (name == "address-passthrough" || name == "address_passthrough") { addressPassthrough = value; + } else if (name == "max-delivery-attempts" || name == "max_delivery_attempts") { + maxDeliveryAttempts = value; + } else if (name == "raise-rejected" || name == "raise_rejected") { + raiseRejected = value; + } else if (name == "redelivery-timeout" || name == "redelivery_timeout") { + redeliveryTimeout = timeValue(value); } else if (name == "properties" || name == "client-properties" || name == "client_properties") { properties = value.asMap(); } else { http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/ConnectionOptions.h ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/ConnectionOptions.h b/src/qpid/messaging/ConnectionOptions.h index 6b89838..a159e0b 100644 --- a/src/qpid/messaging/ConnectionOptions.h +++ b/src/qpid/messaging/ConnectionOptions.h @@ -49,6 +49,9 @@ struct ConnectionOptions : qpid::client::ConnectionSettings bool nestAnnotations; bool setToOnSend; boost::optional<bool> addressPassthrough; + uint32_t maxDeliveryAttempts; + bool raiseRejected; + double redeliveryTimeout; std::map<std::string, qpid::types::Variant> properties; QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&); http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/ConnectionContext.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/ConnectionContext.cpp b/src/qpid/messaging/amqp/ConnectionContext.cpp index ff6a7be..94a549f 100644 --- a/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -199,7 +199,11 @@ void ConnectionContext::close() if (state != CONNECTED) return; if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { - syncLH(i->second, l); + try { + syncLH(i->second, l); + } catch (const MessageRejected& e) { + QPID_LOG(error, "Could not sync session on connection close due to message rejection (use explicit sync to handle exception): " << e.what()); + } if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { pn_session_close(i->second->session); } @@ -493,7 +497,9 @@ qpid::messaging::Address ConnectionContext::passthrough(const qpid::messaging::A boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) { sys::Monitor::ScopedLock l(lock); - boost::shared_ptr<SenderContext> sender = session->createSender(usePassthrough() ? passthrough(address) : address, setToOnSend); + boost::shared_ptr<SenderContext> sender = + session->createSender(usePassthrough() ? passthrough(address) : address, + SenderOptions(setToOnSend, maxDeliveryAttempts, raiseRejected, redeliveryTimeout * qpid::sys::TIME_SEC)); try { attach(session, sender); return sender; @@ -565,10 +571,6 @@ void ConnectionContext::sendLH( QPID_LOG(debug, "Waiting for confirmation..."); wait(ssn, snd);//wait until message has been confirmed } - if ((*delivery)->rejected()) { - throw MessageRejected("Message was rejected by peer"); - } - } } http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/EncodedMessage.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/EncodedMessage.cpp b/src/qpid/messaging/amqp/EncodedMessage.cpp index cf60046..10c8286 100644 --- a/src/qpid/messaging/amqp/EncodedMessage.cpp +++ b/src/qpid/messaging/amqp/EncodedMessage.cpp @@ -51,6 +51,7 @@ EncodedMessage::EncodedMessage() : size(0), data(0), nestAnnotations(false) EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0), nestAnnotations(false) { init(); + ::memcpy(data, other.data, size); } void EncodedMessage::init() http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SenderContext.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/SenderContext.cpp b/src/qpid/messaging/amqp/SenderContext.cpp index fe8b4d3..a3ffb15 100644 --- a/src/qpid/messaging/amqp/SenderContext.cpp +++ b/src/qpid/messaging/amqp/SenderContext.cpp @@ -42,19 +42,26 @@ extern "C" { namespace qpid { namespace messaging { + +MessageReleased::MessageReleased(const std::string& msg) : SendError(msg) {} + namespace amqp { +SenderOptions::SenderOptions(bool setToOnSend_, uint32_t maxDeliveryAttempts_, bool raiseRejected_, const qpid::sys::Duration& d) + : setToOnSend(setToOnSend_), maxDeliveryAttempts(maxDeliveryAttempts_), raiseRejected(raiseRejected_), redeliveryTimeout(d) {} + + //TODO: proper conversion to wide string for address SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, - bool setToOnSend_, + const SenderOptions& o, const CoordinatorPtr& coord) : sender(pn_sender(session, n.c_str())), name(n), address(a), helper(address), nextId(0), capacity(50), unreliable(helper.isUnreliable()), - setToOnSend(setToOnSend_), + options(o), transaction(coord) {} @@ -103,15 +110,15 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: state = transaction->getSendState(); if (unreliable) { Delivery delivery(nextId++); - delivery.encode(MessageImplAccess::get(message), address, setToOnSend); + delivery.encode(MessageImplAccess::get(message), address, options.setToOnSend); delivery.send(sender, unreliable, state); *out = 0; return true; } else { - deliveries.push_back(Delivery(nextId++)); + deliveries.push_back(Delivery(nextId++, options.maxDeliveryAttempts, options.redeliveryTimeout)); try { Delivery& delivery = deliveries.back(); - delivery.encode(MessageImplAccess::get(message), address, setToOnSend); + delivery.encode(MessageImplAccess::get(message), address, options.setToOnSend); delivery.send(sender, unreliable, state); *out = &delivery; return true; @@ -140,11 +147,35 @@ uint32_t SenderContext::processUnsettled(bool silent) if (!silent) { check(); } + bool resend_required = false; //remove messages from front of deque once peer has confirmed receipt - while (!deliveries.empty() && deliveries.front().delivered() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) { - deliveries.front().settle(); - deliveries.pop_front(); + while (!deliveries.empty() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) { + try { + if (deliveries.front().delivered()) { + deliveries.front().settle(); + deliveries.pop_front(); + } else { + break; + } + } catch (const MessageReleased& e) { + //mark it eligible for resending, + deliveries.front().settleAndReset(); + //and move it to the back + deliveries.push_back(deliveries.front()); + deliveries.pop_front(); + resend_required = true; + } catch (const MessageRejected& e) { + deliveries.front().settle(); + if (options.raiseRejected) { + QPID_LOG(info, e.what()); + throw; + } else { + QPID_LOG(warning, e.what()); + deliveries.pop_front(); + } + } } + if (resend_required) resend(); return deliveries.size(); } namespace { @@ -446,7 +477,16 @@ bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messagi } -SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {} +namespace{ +qpid::sys::AbsTime until(const qpid::sys::Duration& d) +{ + return d ? qpid::sys::AbsTime(qpid::sys::now(), d) : qpid::sys::FAR_FUTURE; +} +} + +SenderContext::Delivery::Delivery(int32_t i, const uint32_t max_attempts_, const qpid::sys::Duration& max_time) : + id(i), token(0), settled(false), attempts(0), max_attempts(max_attempts_), + retry_until(until(max_time)) {} void SenderContext::Delivery::reset() { @@ -517,6 +557,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state) { + ++attempts; pn_delivery_tag_t tag; tag.size = sizeof(id); #ifdef NO_PROTON_DELIVERY_TAG_T @@ -532,22 +573,36 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const typ } pn_link_send(sender, encoded.getData(), encoded.getSize()); if (unreliable) { - pn_delivery_settle(token); - presettled = true; + settle(); } pn_link_advance(sender); } bool SenderContext::Delivery::sent() const { - return presettled || token; + return settled || token; } bool SenderContext::Delivery::delivered() { - if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) { - //TODO: need a better means for signalling outcomes other than accepted - if (rejected()) { - QPID_LOG(warning, "delivery " << id << " was rejected by peer"); + if (settled) { + return true; + } else if (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token))) { + if (delivery_refused()) { + throw MessageRejected(Msg() << "delivery " << id << " refused: " << getStatus()); + } else if (not_delivered()) { + if (max_attempts && (attempts >= max_attempts)) { + throw MessageRejected(Msg() << "delivery " << id << " cannot be delivered after " << attempts << " attempts"); + } else if (qpid::sys::now() > retry_until) { + throw MessageRejected(Msg() << "delivery " << id << " cannot be delivered, timed out after " << attempts << " attempts"); + } else { + std::string status = getStatus(); + if (max_attempts) { + QPID_LOG(info, "delivery " << id << " failed attempt " << attempts << " of " << max_attempts << ": " << status); + } else { + QPID_LOG(info, "delivery " << id << " was not successful: " << status); + } + throw MessageReleased(Msg() << "delivery " << id << " was not successful: " << status); + } } else if (!accepted()) { QPID_LOG(info, "delivery " << id << " was not accepted by peer"); } @@ -556,18 +611,66 @@ bool SenderContext::Delivery::delivered() return false; } } +std::string SenderContext::Delivery::getStatus() +{ + if (rejected()) { + pn_disposition_t* d = token ? pn_delivery_remote(token) : 0; + if (d) { + pn_condition_t* c = pn_disposition_condition(d); + if (c && pn_condition_is_set(c)) { + return Msg() << pn_condition_get_name(c) << ": " << pn_condition_get_description(c); + } + } + return "rejected"; + } else if (released()) { + return "released"; + } else if (delivery_refused()) { + return "undeliverable-here"; + } else if (not_delivered()) { + return "delivery-failed"; + } else if (modified()) { + return "modified"; + } + return ""; +} bool SenderContext::Delivery::accepted() { - return pn_delivery_remote_state(token) == PN_ACCEPTED; + return token && pn_delivery_remote_state(token) == PN_ACCEPTED; } bool SenderContext::Delivery::rejected() { - return pn_delivery_remote_state(token) == PN_REJECTED; + return token && pn_delivery_remote_state(token) == PN_REJECTED; +} +bool SenderContext::Delivery::released() +{ + return token && pn_delivery_remote_state(token) == PN_RELEASED; +} +bool SenderContext::Delivery::modified() +{ + return token && pn_delivery_remote_state(token) == PN_MODIFIED; +} +bool SenderContext::Delivery::delivery_refused() +{ + if (modified()) { + pn_disposition_t* d = token ? pn_delivery_remote(token) : 0; + return d && pn_disposition_is_undeliverable(d); + } else { + return rejected(); + } +} +bool SenderContext::Delivery::not_delivered() +{ + if (modified()) { + pn_disposition_t* d = token ? pn_delivery_remote(token) : 0; + return d && !pn_disposition_is_undeliverable(d); + } else { + return released(); + } } std::string SenderContext::Delivery::error() { - pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token)); + pn_condition_t *condition = token ? pn_disposition_condition(pn_delivery_remote(token)) : 0; return (condition && pn_condition_is_set(condition)) ? Msg() << get_error_string(condition, std::string(), std::string()) : std::string(); @@ -575,7 +678,18 @@ std::string SenderContext::Delivery::error() void SenderContext::Delivery::settle() { - pn_delivery_settle(token); + if (!settled) { + pn_delivery_settle(token); + token = 0; // can no longer use the delivery + settled = true; + } +} +void SenderContext::Delivery::settleAndReset() +{ + //settle current delivery: + settle(); + //but treat message as unsent: + settled = false; } void SenderContext::verify() { @@ -635,7 +749,7 @@ void SenderContext::reset(pn_session_t* session) void SenderContext::resend() { - for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) { + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && !i->sent(); ++i) { i->send(sender, false/*only resend reliable transfers*/); } } http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SenderContext.h ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/SenderContext.h b/src/qpid/messaging/amqp/SenderContext.h index 467a8e0..8bed808 100644 --- a/src/qpid/messaging/amqp/SenderContext.h +++ b/src/qpid/messaging/amqp/SenderContext.h @@ -26,6 +26,7 @@ #include <vector> #include <boost/shared_ptr.hpp> #include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/Time.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/amqp/EncodedMessage.h" @@ -41,10 +42,24 @@ namespace messaging { class Message; class MessageImpl; +struct MessageReleased : public SendError +{ + MessageReleased(const std::string&); +}; + namespace amqp { class Transaction; +class SenderOptions { + public: + bool setToOnSend; + uint32_t maxDeliveryAttempts; + bool raiseRejected; + qpid::sys::Duration redeliveryTimeout; + + SenderOptions(bool setToOnSend, uint32_t maxDeliveryAttempts, bool raiseRejected, const qpid::sys::Duration& redeliveryTimeout=qpid::sys::Duration(0)); +}; class SenderContext { @@ -52,14 +67,19 @@ class SenderContext class Delivery { public: - Delivery(int32_t id); + Delivery(int32_t id, const uint32_t max_attempts = 0, const qpid::sys::Duration& = qpid::sys::Duration(0)); void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField); void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant()); bool delivered(); bool accepted(); bool rejected(); + bool released(); + bool modified(); + bool delivery_refused(); + bool not_delivered(); void settle(); void reset(); + void settleAndReset(); bool sent() const; pn_delivery_t* getToken() const { return token; } std::string error(); @@ -67,14 +87,19 @@ class SenderContext int32_t id; pn_delivery_t* token; EncodedMessage encoded; - bool presettled; - }; + bool settled; + uint32_t attempts; + const uint32_t max_attempts; + const qpid::sys::AbsTime retry_until; + + std::string getStatus(); +}; typedef boost::shared_ptr<Transaction> CoordinatorPtr; SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, - bool setToOnSend, + const SenderOptions&, const CoordinatorPtr& transaction = CoordinatorPtr()); virtual ~SenderContext(); @@ -107,7 +132,7 @@ class SenderContext Deliveries deliveries; uint32_t capacity; bool unreliable; - bool setToOnSend; + const SenderOptions options; boost::shared_ptr<Transaction> transaction; uint32_t processUnsettled(bool silent); http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SessionContext.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/SessionContext.cpp b/src/qpid/messaging/amqp/SessionContext.cpp index 92bdea7..3420a64 100644 --- a/src/qpid/messaging/amqp/SessionContext.cpp +++ b/src/qpid/messaging/amqp/SessionContext.cpp @@ -50,14 +50,14 @@ SessionContext::~SessionContext() pn_session_free(session); } -boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend) +boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, const SenderOptions& options) { error.raise(); std::string name = AddressHelper::getLinkName(address); if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection"); boost::shared_ptr<SenderContext> s( - new SenderContext(session, name, address, setToOnSend, transaction)); + new SenderContext(session, name, address, options, transaction)); senders[name] = s; return s; } @@ -208,6 +208,10 @@ bool SessionContext::settled() for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { try { if (!i->second->closed() && !i->second->settled()) result = false; + } catch (const MessageRejected&) { + throw; + } catch (const MessageReleased&) { + throw; } catch (const std::exception&) { senders.erase(i); throw; http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/SessionContext.h ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/SessionContext.h b/src/qpid/messaging/amqp/SessionContext.h index 67b3c1e..8c28208 100644 --- a/src/qpid/messaging/amqp/SessionContext.h +++ b/src/qpid/messaging/amqp/SessionContext.h @@ -42,6 +42,7 @@ namespace amqp { class ConnectionContext; class SenderContext; +class SenderOptions; class ReceiverContext; class Transaction; @@ -54,7 +55,7 @@ class SessionContext SessionContext(pn_connection_t*); ~SessionContext(); void reset(pn_connection_t*); - boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, bool setToOnSend); + boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, const SenderOptions&); boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address); boost::shared_ptr<SenderContext> getSender(const std::string& name) const; boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const; http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/74902a52/src/qpid/messaging/amqp/Transaction.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/Transaction.cpp b/src/qpid/messaging/amqp/Transaction.cpp index 754b00d..9c4bfeb 100644 --- a/src/qpid/messaging/amqp/Transaction.cpp +++ b/src/qpid/messaging/amqp/Transaction.cpp @@ -42,7 +42,7 @@ const std::string ADDRESS("tx-transaction;{link:{reliability:at-least-once}}"); } Transaction::Transaction(pn_session_t* session) : - SenderContext(session, TX_COORDINATOR, Address(ADDRESS), false), committing(false) + SenderContext(session, TX_COORDINATOR, Address(ADDRESS), SenderOptions(false, true, true)), committing(false) {} void Transaction::clear() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org