This is an automated email from the ASF dual-hosted git repository. gsim pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 462e9568f If cannot deliver incoming message due to queue limit then RELEASE message 462e9568f is described below commit 462e9568f81241ac4924d75bf7e551bea185e82d Author: Pete Fawcett <p...@fawcett.co.uk> AuthorDate: Fri May 27 13:47:29 2022 +0100 If cannot deliver incoming message due to queue limit then RELEASE message --- src/qpid/broker/Queue.cpp | 6 ++++- src/qpid/broker/amqp/Incoming.cpp | 16 +++++++++---- src/qpid/broker/amqp/Session.cpp | 50 +++++++++++++++++---------------------- 3 files changed, 39 insertions(+), 33 deletions(-) diff --git a/src/qpid/broker/Queue.cpp b/src/qpid/broker/Queue.cpp index 16ac65946..7baaaa254 100644 --- a/src/qpid/broker/Queue.cpp +++ b/src/qpid/broker/Queue.cpp @@ -1661,7 +1661,11 @@ bool Queue::checkDepth(const QueueDepth& increment, const Message&) if (brokerMgmtObject) brokerMgmtObject->inc_discardsOverflow(); } - throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]")); + throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << + ": current=[" << current << + "], max=[" << settings.maxDepth << + "], increment=[" << increment << + "]")); } else { current += increment; return true; diff --git a/src/qpid/broker/amqp/Incoming.cpp b/src/qpid/broker/amqp/Incoming.cpp index 0507aade4..5e851a8bc 100644 --- a/src/qpid/broker/amqp/Incoming.cpp +++ b/src/qpid/broker/amqp/Incoming.cpp @@ -154,9 +154,17 @@ void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> } userid.verify(message.getUserId()); received->begin(); - handle(message, session.getTransaction(delivery)); - Transfer t(delivery, sessionPtr); - sessionPtr->pending_accept(delivery); - received->end(t); + + try { + handle(message, session.getTransaction(delivery)); + Transfer t(delivery, sessionPtr); + sessionPtr->pending_accept(delivery); + received->end(t); + } catch (const qpid::framing::ResourceLimitExceededException& e) { + pn_delivery_update(delivery, PN_RELEASED); + pn_delivery_settle(delivery); + } catch (const qpid::SessionException& e) { + throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what()); + } } }}} // namespace qpid::broker::amqp diff --git a/src/qpid/broker/amqp/Session.cpp b/src/qpid/broker/amqp/Session.cpp index d1a0f38bb..684235cf0 100644 --- a/src/qpid/broker/amqp/Session.cpp +++ b/src/qpid/broker/amqp/Session.cpp @@ -948,28 +948,27 @@ void IncomingToQueue::handle(qpid::broker::Message& message, qpid::broker::TxBuf msg << " Queue " << queue->getName() << " has been deleted"; throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str()); } + try { queue->deliver(message, transaction); - } catch (const qpid::SessionException& e) { - throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what()); } + catch (const qpid::Exception& e) { + QPID_LOG(warning, "Cannot deliver to queue " << queue->getName() << ": " << e.what()); + throw; + } + } void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction) { - if (exchange->isDestroyed()) + if (exchange->isDestroyed()) { throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " << exchange->getName() << " has been deleted.")); - try { - authorise.route(exchange, message); - DeliverableMessage deliverable(message, transaction); - exchange->route(deliverable); - if (!deliverable.delivered) { - if (exchange->getAlternate()) { - exchange->getAlternate()->route(deliverable); - } - } - } catch (const qpid::SessionException& e) { - throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what()); + } + authorise.route(exchange, message); + DeliverableMessage deliverable(message, transaction); + exchange->route(deliverable); + if (!deliverable.delivered && exchange->getAlternate()) { + exchange->getAlternate()->route(deliverable); } } @@ -993,21 +992,16 @@ void AnonymousRelay::handle(qpid::broker::Message& message, qpid::broker::TxBuff } } - try { - if (queue) { - authorise.incoming(queue); - queue->deliver(message, transaction); - } else if (exchange) { - authorise.route(exchange, message); - DeliverableMessage deliverable(message, transaction); - exchange->route(deliverable); - } else { - QPID_LOG(info, "AnonymousRelay dropping message for " << dest); - } - } catch (const qpid::SessionException& e) { - throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what()); + if (queue) { + authorise.incoming(queue); + queue->deliver(message, transaction); + } else if (exchange) { + authorise.route(exchange, message); + DeliverableMessage deliverable(message, transaction); + exchange->route(deliverable); + } else { + QPID_LOG(info, "AnonymousRelay dropping message for " << dest); } - } void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org