Author: aconway Date: Mon Dec 1 17:41:09 2014 New Revision: 1642720 URL: http://svn.apache.org/r1642720 Log: QPID-6252: AMQP 1.0 browsing client generates large number of errors on broker.
The problem was that messages for browsing receivers were being recorded on the client SessionContext unacked list. This is incorrect since you don't ack browsed messages. They remained on the list after the browsing receiver was closed, and every subsequent call to acknowledge() on the client would attempt to ack these messages for a no-longer-existing link. Fix is to not record browsed messages. Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1642720&r1=1642719&r2=1642720&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Mon Dec 1 17:41:09 2014 @@ -44,6 +44,7 @@ class AddressHelper const qpid::types::Variant::Map& getNodeProperties() const; bool getLinkSource(std::string& out) const; bool getLinkTarget(std::string& out) const; + bool getBrowse() const { return browse; } const qpid::types::Variant::Map& getLinkProperties() const; static std::string getLinkName(const Address& address); private: Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1642720&r1=1642719&r2=1642720&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Dec 1 17:41:09 2014 @@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::share QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); encoded->init(impl); impl.setEncoded(encoded); - impl.setInternalId(ssn->record(current)); + impl.setInternalId(ssn->record(current, lnk->getBrowse())); pn_link_advance(lnk->receiver); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1642720&r1=1642719&r2=1642720&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Mon Dec 1 17:41:09 2014 @@ -36,7 +36,9 @@ ReceiverContext::ReceiverContext(pn_sess address(a), helper(address), receiver(pn_receiver(session, name.c_str())), - capacity(0), used(0) {} + capacity(0), used(0) +{} + ReceiverContext::~ReceiverContext() { //pn_link_free(receiver); Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1642720&r1=1642719&r2=1642720&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Mon Dec 1 17:41:09 2014 @@ -60,6 +60,8 @@ class ReceiverContext void verify(); Address getAddress() const; bool hasCurrent(); + bool getBrowse() const { return helper.getBrowse(); } + private: friend class ConnectionContext; const std::string name; Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1642720&r1=1642719&r2=1642720&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Mon Dec 1 17:41:09 2014 @@ -110,10 +110,10 @@ uint32_t SessionContext::getUnsettledAck return 0;//TODO } -qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, bool browse) { qpid::framing::SequenceNumber id = next++; - unacked[id] = delivery; + if (!browse) unacked[id] = delivery; QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); return id; } Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1642720&r1=1642719&r2=1642720&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Mon Dec 1 17:41:09 2014 @@ -75,7 +75,7 @@ class SessionContext qpid::framing::SequenceNumber next; std::string name; - qpid::framing::SequenceNumber record(pn_delivery_t*); + qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org