Author: gsim Date: Wed Nov 23 16:01:25 2011 New Revision: 1205467 URL: http://svn.apache.org/viewvc?rev=1205467&view=rev Log: QPID-3629: Changed management of credit window
Added: qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt qpid/trunk/qpid/cpp/src/Makefile.am qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp qpid/trunk/qpid/cpp/xml/cluster.xml qpid/trunk/qpid/python/qpid/testlib.py qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original) +++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Nov 23 16:01:25 2011 @@ -993,6 +993,7 @@ set (qpidbroker_SOURCES qpid/amqp_0_10/Connection.h qpid/amqp_0_10/Connection.cpp qpid/broker/Broker.cpp + qpid/broker/Credit.cpp qpid/broker/Exchange.cpp qpid/broker/ExpiryPolicy.cpp qpid/broker/Fairshare.cpp Modified: qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Nov 23 16:01:25 2011 @@ -534,6 +534,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ConnectionState.h \ qpid/broker/ConnectionToken.h \ qpid/broker/Consumer.h \ + qpid/broker/Credit.h \ + qpid/broker/Credit.cpp \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ qpid/broker/Deliverable.h \ Added: qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp?rev=1205467&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp Wed Nov 23 16:01:25 2011 @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Credit.h" + +namespace qpid { +namespace broker { + +const uint32_t CreditBalance::INFINITE_CREDIT(0xFFFFFFFF); +CreditBalance::CreditBalance() : balance(0) {} +CreditBalance::~CreditBalance() {} +void CreditBalance::clear() { balance = 0; } +void CreditBalance::grant(uint32_t value) +{ + if (balance != INFINITE_CREDIT) { + if (value == INFINITE_CREDIT) { + balance = INFINITE_CREDIT; + } else if (INFINITE_CREDIT - balance > value) { + balance += value; + } else { + balance = INFINITE_CREDIT - 1; + } + } +} +void CreditBalance::consume(uint32_t value) { if (!unlimited()) balance -= value; } +bool CreditBalance::check(uint32_t required) const { return balance >= required; } +uint32_t CreditBalance::remaining() const { return balance; } +uint32_t CreditBalance::allocated() const { return balance; } +bool CreditBalance::unlimited() const { return balance == INFINITE_CREDIT; } + +CreditWindow::CreditWindow() : used(0) {} +bool CreditWindow::check(uint32_t required) const { return CreditBalance::check(used + required); } +void CreditWindow::consume(uint32_t value) { if (!unlimited()) used += value; } +void CreditWindow::move(uint32_t value) { if (!unlimited()) used -= value; } +uint32_t CreditWindow::remaining() const { return allocated() - used; } +uint32_t CreditWindow::consumed() const { return used; } + +Credit::Credit() : windowing(true) {} +void Credit::setWindowMode(bool b) { windowing = b; } +bool Credit::isWindowMode() const { return windowing; } +void Credit::addByteCredit(uint32_t value) +{ + bytes().grant(value); +} +void Credit::addMessageCredit(uint32_t value) +{ + messages().grant(value); +} +void Credit::cancel() +{ + messages().clear(); + bytes().clear(); +} +void Credit::moveWindow(uint32_t m, uint32_t b) +{ + if (windowing) { + window.messages.move(m); + window.bytes.move(b); + } +} +void Credit::consume(uint32_t m, uint32_t b) +{ + messages().consume(m); + bytes().consume(b); +} +bool Credit::check(uint32_t m, uint32_t b) const +{ + return messages().check(m) && bytes().check(b); +} +CreditPair<uint32_t> Credit::used() const +{ + CreditPair<uint32_t> result; + if (windowing) { + result.messages = window.messages.consumed(); + result.bytes = window.bytes.consumed(); + } else { + result.messages = 0; + result.bytes = 0; + } + return result; +} +CreditPair<uint32_t> Credit::allocated() const +{ + CreditPair<uint32_t> result; + result.messages = messages().allocated(); + result.bytes = bytes().allocated(); + return result; +} +Credit::operator bool() const +{ + return check(1,1); +} +CreditBalance& Credit::messages() +{ + if (windowing) return window.messages; + else return balance.messages; +} +CreditBalance& Credit::bytes() +{ + if (windowing) return window.bytes; + else return balance.bytes; +} +const CreditBalance& Credit::messages() const +{ + if (windowing) return window.messages; + else return balance.messages; +} +const CreditBalance& Credit::bytes() const +{ + if (windowing) return window.bytes; + else return balance.bytes; +} +std::ostream& operator<<(std::ostream& out, const CreditBalance& b) +{ + if (b.unlimited()) return out << "unlimited"; + else return out << b.balance; +} +std::ostream& operator<<(std::ostream& out, const CreditWindow& w) +{ + if (w.unlimited()) return out << ((CreditBalance) w); + else return out << w.remaining() << " (from window of " << w.allocated() << ")"; +} +template <class T> +std::ostream& operator<<(std::ostream& out, const CreditPair<T>& pair) +{ + return out << "messages: " << pair.messages << " bytes: " << pair.bytes; +} +std::ostream& operator<<(std::ostream& out, const Credit& c) +{ + if (c.windowing) return out << c.window; + else return out << c.balance; +} + +}} // namespace qpid::broker Added: qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h?rev=1205467&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h Wed Nov 23 16:01:25 2011 @@ -0,0 +1,96 @@ +#ifndef QPID_BROKER_CREDIT_H +#define QPID_BROKER_CREDIT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include <memory> +#include <ostream> + +namespace qpid { +namespace broker { + +class CreditBalance { + public: + CreditBalance(); + virtual ~CreditBalance(); + void clear(); + void grant(uint32_t value); + virtual void consume(uint32_t value); + virtual bool check(uint32_t required) const; + virtual uint32_t remaining() const; + uint32_t allocated() const; + bool unlimited() const; + static const uint32_t INFINITE_CREDIT; + friend std::ostream& operator<<(std::ostream&, const CreditBalance&); + private: + uint32_t balance; +}; + +class CreditWindow : public CreditBalance { + public: + CreditWindow(); + bool check(uint32_t required) const; + void consume(uint32_t value); + void move(uint32_t value); + uint32_t remaining() const; + uint32_t consumed() const; + friend std::ostream& operator<<(std::ostream&, const CreditWindow&); + private: + uint32_t used; +}; + +template<class T> struct CreditPair +{ + T messages; + T bytes; +}; + +class Credit { + public: + Credit(); + void setWindowMode(bool); + bool isWindowMode() const; + void addByteCredit(uint32_t); + void addMessageCredit(uint32_t); + void consume(uint32_t messages, uint32_t bytes); + void moveWindow(uint32_t messages, uint32_t bytes); + bool check(uint32_t messages, uint32_t bytes) const; + void cancel(); + operator bool() const; + CreditPair<uint32_t> allocated() const; + CreditPair<uint32_t> used() const; + friend std::ostream& operator<<(std::ostream&, const Credit&); + private: + CreditPair<CreditBalance> balance; + CreditPair<CreditWindow> window; + bool windowing; + CreditBalance& bytes(); + CreditBalance& messages(); + const CreditBalance& bytes() const; + const CreditBalance& messages() const; +}; + +std::ostream& operator<<(std::ostream&, const Credit&); + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CREDIT_H*/ Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Nov 23 16:01:25 2011 @@ -285,15 +285,11 @@ SemanticState::ConsumerImpl::ConsumerImp ackExpected(ack), acquire(_acquire), blocked(true), - windowing(true), - windowActive(false), exclusive(_exclusive), resumeId(_resumeId), tag(_tag), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), - byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), deliveryCount(0), @@ -338,11 +334,11 @@ bool SemanticState::ConsumerImpl::delive { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing); + DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode()); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); - if (windowing || ackExpected || !acquire) { + if (credit.isWindowMode() || ackExpected || !acquire) { parent->record(record); } if (acquire && !ackExpected) { // auto acquire && auto accept @@ -385,28 +381,19 @@ ostream& operator<<(ostream& o, const Co void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { assertClusterSafe(); - uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; - if (msgCredit != 0xFFFFFFFF) { - msgCredit--; - } - if (byteCredit != 0xFFFFFFFF) { - byteCredit -= msg->getRequiredCredit(); - } + Credit original = credit; + credit.consume(1, msg->getRequiredCredit()); QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) - << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit - << " now bytes: " << byteCredit << " msgs: " << msgCredit); + << ", was " << original << " now " << credit); } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { - bool enoughCredit = msgCredit > 0 && - (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); - QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ") - << ConsumerName(*this) - << ", have bytes: " << byteCredit << " msgs: " << msgCredit - << ", need " << msg->getRequiredCredit() << " bytes"); + bool enoughCredit = credit.check(1, msg->getRequiredCredit()); + QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient") + << " credit for message of " << msg->getRequiredCredit() << " bytes: " + << credit); return enoughCredit; } @@ -539,9 +526,8 @@ void SemanticState::ConsumerImpl::comple { if (!delivery.isComplete()) { delivery.complete(); - if (windowing && windowActive) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); + if (credit.isWindowMode()) { + credit.moveWindow(1, delivery.getCredit()); } } } @@ -628,7 +614,7 @@ void SemanticState::stop(const std::stri void SemanticState::ConsumerImpl::setWindowMode() { assertClusterSafe(); - windowing = true; + credit.setWindowMode(true); if (mgmtObject){ mgmtObject->set_creditMode("WINDOW"); } @@ -637,7 +623,7 @@ void SemanticState::ConsumerImpl::setWin void SemanticState::ConsumerImpl::setCreditMode() { assertClusterSafe(); - windowing = false; + credit.setWindowMode(false); if (mgmtObject){ mgmtObject->set_creditMode("CREDIT"); } @@ -646,26 +632,18 @@ void SemanticState::ConsumerImpl::setCre void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { assertClusterSafe(); - if (windowing) windowActive = true; - if (byteCredit != 0xFFFFFFFF) { - if (value == 0xFFFFFFFF) byteCredit = value; - else byteCredit += value; - } + credit.addByteCredit(value); } void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { assertClusterSafe(); - if (windowing) windowActive = true; - if (msgCredit != 0xFFFFFFFF) { - if (value == 0xFFFFFFFF) msgCredit = value; - else msgCredit += value; - } + credit.addMessageCredit(value); } bool SemanticState::ConsumerImpl::haveCredit() { - if (msgCredit && byteCredit) { + if (credit) { return true; } else { blocked = true; @@ -677,16 +655,13 @@ void SemanticState::ConsumerImpl::flush( { while(haveCredit() && queue->dispatch(shared_from_this())) ; - msgCredit = 0; - byteCredit = 0; + credit.cancel(); } void SemanticState::ConsumerImpl::stop() { assertClusterSafe(); - msgCredit = 0; - byteCredit = 0; - windowActive = false; + credit.cancel(); } Queue::shared_ptr SemanticState::getQueue(const string& name) const { Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Nov 23 16:01:25 2011 @@ -23,6 +23,7 @@ */ #include "qpid/broker/Consumer.h" +#include "qpid/broker/Credit.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/DeliveryAdapter.h" #include "qpid/broker/DeliveryRecord.h" @@ -79,15 +80,12 @@ class SemanticState : private boost::non const bool ackExpected; const bool acquire; bool blocked; - bool windowing; - bool windowActive; bool exclusive; std::string resumeId; const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command uint64_t resumeTtl; framing::FieldTable arguments; - uint32_t msgCredit; - uint32_t byteCredit; + Credit credit; bool notifyEnabled; const int syncFrequency; int deliveryCount; @@ -131,12 +129,11 @@ class SemanticState : private boost::non bool doOutput(); + Credit& getCredit() { return credit; } + const Credit& getCredit() const { return credit; } bool isAckExpected() const { return ackExpected; } bool isAcquire() const { return acquire; } - bool isWindowing() const { return windowing; } bool isExclusive() const { return exclusive; } - uint32_t getMsgCredit() const { return msgCredit; } - uint32_t getByteCredit() const { return byteCredit; } std::string getResumeId() const { return resumeId; }; const std::string& getTag() const { return tag; } uint64_t getResumeTtl() const { return resumeTtl; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Nov 23 16:01:25 2011 @@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpi * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1159329; +const uint32_t Cluster::CLUSTER_VERSION = 1159330; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Nov 23 16:01:25 2011 @@ -404,11 +404,13 @@ void Connection::shadowSetUser(const std connection->setUserId(userId); } -void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) +void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position, + uint32_t usedMsgCredit, uint32_t usedByteCredit) { broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); c->position = position; c->setBlocked(blocked); + if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit); if (notifyEnabled) c->enableNotify(); else c->disableNotify(); updateIn.consumerNumbering.add(c); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Nov 23 16:01:25 2011 @@ -109,7 +109,8 @@ class Connection : // Called for data delivered from the cluster. void deliveredFrame(const EventFrame&); - void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); + void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position, + uint32_t usedMsgCredit, uint32_t usedByteCredit); // ==== Used in catch-up mode to build initial state. // Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Nov 23 16:01:25 2011 @@ -535,14 +535,16 @@ void UpdateClient::updateConsumer( arg::resumeTtl = ci->getResumeTtl(), arg::arguments = ci->getArguments() ); - shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + shadowSession.messageSetFlowMode(ci->getTag(), ci->getCredit().isWindowMode() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getCredit().allocated().messages); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getCredit().allocated().bytes); ClusterConnectionProxy(shadowSession).consumerState( ci->getTag(), ci->isBlocked(), ci->isNotifyEnabled(), - ci->position + ci->position, + ci->getCredit().used().messages, + ci->getCredit().used().bytes ); consumerNumbering.add(ci.get()); Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Wed Nov 23 16:01:25 2011 @@ -354,6 +354,8 @@ QPID_AUTO_TEST_CASE(testCompleteOnAccept BOOST_CHECK(!q.get(m)); s.accept(accepted); + //need to reallocate credit as we have flushed it all out + s.setFlowControl(FlowControl::messageWindow(chunk)); fix.session.messageFlush(arg::destination=s.getName()); accepted.clear(); Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Nov 23 16:01:25 2011 @@ -176,6 +176,8 @@ <field name="blocked" type="bit"/> <field name="notifyEnabled" type="bit"/> <field name="position" type="sequence-no"/> + <field name="used-msg-credit" type="uint32"/> + <field name="used-byte-credit" type="uint32"/> </control> <!-- Delivery-record for outgoing messages sent but not yet accepted. --> Modified: qpid/trunk/qpid/python/qpid/testlib.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/testlib.py (original) +++ qpid/trunk/qpid/python/qpid/testlib.py Wed Nov 23 16:01:25 2011 @@ -187,6 +187,7 @@ class TestBase010(unittest.TestCase): self.conn = self.connect() self.session = self.conn.session("test-session", timeout=10) self.qmf = None + self.test_queue_name = self.id() def startQmf(self, handler=None): self.qmf = qmf.console.Session(handler) Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py?rev=1205467&r1=1205466&r2=1205467&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Wed Nov 23 16:01:25 2011 @@ -611,6 +611,84 @@ class MessageTests(TestBase010): msg = q.get(timeout = 1) self.assertDataEquals(session, msg, "Message %d" % (i+6)) + def test_credit_window_after_messagestop(self): + """ + Tests that the broker's credit window size doesnt exceed the requested value when completing + previous messageTransfer commands after a message_stop and message_flow. + """ + + session = self.session + + #create queue + session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True) + + #send 11 messages + for i in range(1, 12): + session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i))) + + + #subscribe: + session.message_subscribe(queue=self.test_queue_name, destination="a") + a = session.incoming("a") + session.message_set_flow_mode(flow_mode = 1, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + # issue 5 message credits + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + + # get 5 messages + ids = RangedSet() + for i in range(1, 6): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + ids.add(msg.id) + + # now try and read a 6th message. we expect this to fail due to exhausted message credit. + try: + extra = a.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + session.message_stop(destination = "a") + + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a") + + # complete earlier messages after setting the window to 5 message credits + session.channel.session_completed(ids) + + # Now continue to read the next 5 messages + for i in range(6, 11): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + + # now try and read the 11th message. we expect this to fail due to exhausted message credit. If we receive an + # 11th this indicates the broker is not respecting the client's requested window size. + try: + extra = a.get(timeout=1) + self.fail("Got unexpected message: " + extra.body) + except Empty: None + + def test_no_credit_wrap(self): + """ + Ensure that adding credit does not result in wrapround, lowering the balance. + """ + session = self.session + + session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True) + session.message_subscribe(queue=self.test_queue_name, destination="a") + a = session.incoming("a") + session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a") + session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a") + session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFAL, destination = "a") + #test wraparound of credit balance does not occur + session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a") + for i in range(1, 50): + session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i))) + session.message_flush(destination = "a") + for i in range(1, 50): + msg = a.get(timeout = 1) + self.assertEquals("message-%d" % (i), msg.body) + def test_subscribe_not_acquired(self): """ --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org