Author: gsim
Date: Wed Nov 23 16:01:25 2011
New Revision: 1205467

URL: http://svn.apache.org/viewvc?rev=1205467&view=rev
Log:
QPID-3629: Changed management of credit window

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml
    qpid/trunk/qpid/python/qpid/testlib.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Nov 23 16:01:25 2011
@@ -993,6 +993,7 @@ set (qpidbroker_SOURCES
      qpid/amqp_0_10/Connection.h
      qpid/amqp_0_10/Connection.cpp
      qpid/broker/Broker.cpp
+     qpid/broker/Credit.cpp
      qpid/broker/Exchange.cpp
      qpid/broker/ExpiryPolicy.cpp
      qpid/broker/Fairshare.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Nov 23 16:01:25 2011
@@ -534,6 +534,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/ConnectionState.h \
   qpid/broker/ConnectionToken.h \
   qpid/broker/Consumer.h \
+  qpid/broker/Credit.h \
+  qpid/broker/Credit.cpp \
   qpid/broker/Daemon.cpp \
   qpid/broker/Daemon.h \
   qpid/broker/Deliverable.h \

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp?rev=1205467&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Credit.cpp Wed Nov 23 16:01:25 2011
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Credit.h"
+
+namespace qpid {
+namespace broker {
+
+const uint32_t CreditBalance::INFINITE_CREDIT(0xFFFFFFFF);
+CreditBalance::CreditBalance() : balance(0) {}
+CreditBalance::~CreditBalance() {}
+void CreditBalance::clear() { balance = 0; }
+void CreditBalance::grant(uint32_t value)
+{
+    if (balance != INFINITE_CREDIT) {
+        if (value == INFINITE_CREDIT) {
+            balance = INFINITE_CREDIT;
+        } else if (INFINITE_CREDIT - balance > value) {
+            balance += value;
+        } else {
+            balance = INFINITE_CREDIT - 1;
+        }
+    }
+}
+void CreditBalance::consume(uint32_t value) { if (!unlimited()) balance -= 
value; }
+bool CreditBalance::check(uint32_t required) const { return balance >= 
required; }
+uint32_t CreditBalance::remaining() const { return balance; }
+uint32_t CreditBalance::allocated() const { return balance; }
+bool CreditBalance::unlimited() const { return balance == INFINITE_CREDIT; }
+
+CreditWindow::CreditWindow() : used(0) {}
+bool CreditWindow::check(uint32_t required) const { return 
CreditBalance::check(used + required); }
+void CreditWindow::consume(uint32_t value) { if (!unlimited()) used += value; }
+void CreditWindow::move(uint32_t value) { if (!unlimited()) used -= value; }
+uint32_t CreditWindow::remaining() const { return allocated() - used; }
+uint32_t CreditWindow::consumed() const { return used; }
+
+Credit::Credit() : windowing(true) {}
+void Credit::setWindowMode(bool b) { windowing = b; }
+bool Credit::isWindowMode() const { return windowing; }
+void Credit::addByteCredit(uint32_t value)
+{
+    bytes().grant(value);
+}
+void Credit::addMessageCredit(uint32_t value)
+{
+    messages().grant(value);
+}
+void Credit::cancel()
+{
+    messages().clear();
+    bytes().clear();
+}
+void Credit::moveWindow(uint32_t m, uint32_t b)
+{
+    if (windowing) {
+        window.messages.move(m);
+        window.bytes.move(b);
+    }
+}
+void Credit::consume(uint32_t m, uint32_t b)
+{
+    messages().consume(m);
+    bytes().consume(b);
+}
+bool Credit::check(uint32_t m, uint32_t b) const
+{
+    return messages().check(m) && bytes().check(b);
+}
+CreditPair<uint32_t> Credit::used() const
+{
+    CreditPair<uint32_t> result;
+    if (windowing) {
+        result.messages = window.messages.consumed();
+        result.bytes = window.bytes.consumed();
+    } else {
+        result.messages = 0;
+        result.bytes = 0;
+    }
+    return result;
+}
+CreditPair<uint32_t> Credit::allocated() const
+{
+    CreditPair<uint32_t> result;
+    result.messages = messages().allocated();
+    result.bytes = bytes().allocated();
+    return result;
+}
+Credit::operator bool() const
+{
+    return check(1,1);
+}
+CreditBalance& Credit::messages()
+{
+    if (windowing) return window.messages;
+    else return balance.messages;
+}
+CreditBalance& Credit::bytes()
+{
+    if (windowing) return window.bytes;
+    else return balance.bytes;
+}
+const CreditBalance& Credit::messages() const
+{
+    if (windowing) return window.messages;
+    else return balance.messages;
+}
+const CreditBalance& Credit::bytes() const
+{
+    if (windowing) return window.bytes;
+    else return balance.bytes;
+}
+std::ostream& operator<<(std::ostream& out, const CreditBalance& b)
+{
+    if (b.unlimited()) return out << "unlimited";
+    else return out << b.balance;
+}
+std::ostream& operator<<(std::ostream& out, const CreditWindow& w)
+{
+    if (w.unlimited()) return out << ((CreditBalance) w);
+    else return out << w.remaining() << " (from window of " << w.allocated() 
<< ")";
+}
+template <class T>
+std::ostream& operator<<(std::ostream& out, const CreditPair<T>& pair)
+{
+    return out << "messages: " << pair.messages << " bytes: " << pair.bytes;
+}
+std::ostream& operator<<(std::ostream& out, const Credit& c)
+{
+    if (c.windowing) return out << c.window;
+    else return out << c.balance;
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h?rev=1205467&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Credit.h Wed Nov 23 16:01:25 2011
@@ -0,0 +1,96 @@
+#ifndef QPID_BROKER_CREDIT_H
+#define QPID_BROKER_CREDIT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/IntegerTypes.h"
+#include <memory>
+#include <ostream>
+
+namespace qpid {
+namespace broker {
+
+class CreditBalance {
+  public:
+    CreditBalance();
+    virtual ~CreditBalance();
+    void clear();
+    void grant(uint32_t value);
+    virtual void consume(uint32_t value);
+    virtual bool check(uint32_t required) const;
+    virtual uint32_t remaining() const;
+    uint32_t allocated() const;
+    bool unlimited() const;
+    static const uint32_t INFINITE_CREDIT;
+  friend std::ostream& operator<<(std::ostream&, const CreditBalance&);
+  private:
+    uint32_t balance;
+};
+
+class CreditWindow : public CreditBalance {
+  public:
+    CreditWindow();
+    bool check(uint32_t required) const;
+    void consume(uint32_t value);
+    void move(uint32_t value);
+    uint32_t remaining() const;
+    uint32_t consumed() const;
+  friend std::ostream& operator<<(std::ostream&, const CreditWindow&);
+  private:
+    uint32_t used;
+};
+
+template<class T> struct CreditPair
+{
+    T messages;
+    T bytes;
+};
+
+class Credit {
+  public:
+    Credit();
+    void setWindowMode(bool);
+    bool isWindowMode() const;
+    void addByteCredit(uint32_t);
+    void addMessageCredit(uint32_t);
+    void consume(uint32_t messages, uint32_t bytes);
+    void moveWindow(uint32_t messages, uint32_t bytes);
+    bool check(uint32_t messages, uint32_t bytes) const;
+    void cancel();
+    operator bool() const;
+    CreditPair<uint32_t> allocated() const;
+    CreditPair<uint32_t> used() const;
+  friend std::ostream& operator<<(std::ostream&, const Credit&);
+  private:
+    CreditPair<CreditBalance> balance;
+    CreditPair<CreditWindow> window;
+    bool windowing;
+    CreditBalance& bytes();
+    CreditBalance& messages();
+    const CreditBalance& bytes() const;
+    const CreditBalance& messages() const;
+};
+
+std::ostream& operator<<(std::ostream&, const Credit&);
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CREDIT_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Nov 23 16:01:25 
2011
@@ -285,15 +285,11 @@ SemanticState::ConsumerImpl::ConsumerImp
     ackExpected(ack),
     acquire(_acquire),
     blocked(true),
-    windowing(true),
-    windowActive(false),
     exclusive(_exclusive),
     resumeId(_resumeId),
     tag(_tag),
     resumeTtl(_resumeTtl),
     arguments(_arguments),
-    msgCredit(0),
-    byteCredit(0),
     notifyEnabled(true),
     syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
     deliveryCount(0),
@@ -338,11 +334,11 @@ bool SemanticState::ConsumerImpl::delive
 {
     assertClusterSafe();
     allocateCredit(msg.payload);
-    DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, 
windowing);
+    DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, 
credit.isWindowMode());
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
     parent->deliver(record, sync);
-    if (windowing || ackExpected || !acquire) {
+    if (credit.isWindowMode() || ackExpected || !acquire) {
         parent->record(record);
     }
     if (acquire && !ackExpected) {  // auto acquire && auto accept
@@ -385,28 +381,19 @@ ostream& operator<<(ostream& o, const Co
 void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
 {
     assertClusterSafe();
-    uint32_t originalMsgCredit = msgCredit;
-    uint32_t originalByteCredit = byteCredit;
-    if (msgCredit != 0xFFFFFFFF) {
-        msgCredit--;
-    }
-    if (byteCredit != 0xFFFFFFFF) {
-        byteCredit -= msg->getRequiredCredit();
-    }
+    Credit original = credit;
+    credit.consume(1, msg->getRequiredCredit());
     QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
-             << ", was " << " bytes: " << originalByteCredit << " msgs: " << 
originalMsgCredit
-             << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+             << ", was " << original << " now " << credit);
 
 }
 
 bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
 {
-    bool enoughCredit = msgCredit > 0 &&
-        (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
-    QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient 
credit for ")
-             << ConsumerName(*this)
-             << ", have bytes: " << byteCredit << " msgs: " << msgCredit
-             << ", need " << msg->getRequiredCredit() << " bytes");
+    bool enoughCredit = credit.check(1, msg->getRequiredCredit());
+    QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << 
(enoughCredit ? "sufficient " : "insufficient")
+             <<  " credit for message of " << msg->getRequiredCredit() << " 
bytes: "
+             << credit);
     return enoughCredit;
 }
 
@@ -539,9 +526,8 @@ void SemanticState::ConsumerImpl::comple
 {
     if (!delivery.isComplete()) {
         delivery.complete();
-        if (windowing && windowActive) {
-            if (msgCredit != 0xFFFFFFFF) msgCredit++;
-            if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
+        if (credit.isWindowMode()) {
+            credit.moveWindow(1, delivery.getCredit());
         }
     }
 }
@@ -628,7 +614,7 @@ void SemanticState::stop(const std::stri
 void SemanticState::ConsumerImpl::setWindowMode()
 {
     assertClusterSafe();
-    windowing = true;
+    credit.setWindowMode(true);
     if (mgmtObject){
         mgmtObject->set_creditMode("WINDOW");
     }
@@ -637,7 +623,7 @@ void SemanticState::ConsumerImpl::setWin
 void SemanticState::ConsumerImpl::setCreditMode()
 {
     assertClusterSafe();
-    windowing = false;
+    credit.setWindowMode(false);
     if (mgmtObject){
         mgmtObject->set_creditMode("CREDIT");
     }
@@ -646,26 +632,18 @@ void SemanticState::ConsumerImpl::setCre
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
     assertClusterSafe();
-    if (windowing) windowActive = true;
-    if (byteCredit != 0xFFFFFFFF) {
-        if (value == 0xFFFFFFFF) byteCredit = value;
-        else byteCredit += value;
-    }
+    credit.addByteCredit(value);
 }
 
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
     assertClusterSafe();
-    if (windowing) windowActive = true;
-    if (msgCredit != 0xFFFFFFFF) {
-        if (value == 0xFFFFFFFF) msgCredit = value;
-        else msgCredit += value;
-    }
+    credit.addMessageCredit(value);
 }
 
 bool SemanticState::ConsumerImpl::haveCredit()
 {
-    if (msgCredit && byteCredit) {
+    if (credit) {
         return true;
     } else {
         blocked = true;
@@ -677,16 +655,13 @@ void SemanticState::ConsumerImpl::flush(
 {
     while(haveCredit() && queue->dispatch(shared_from_this()))
         ;
-    msgCredit = 0;
-    byteCredit = 0;
+    credit.cancel();
 }
 
 void SemanticState::ConsumerImpl::stop()
 {
     assertClusterSafe();
-    msgCredit = 0;
-    byteCredit = 0;
-    windowActive = false;
+    credit.cancel();
 }
 
 Queue::shared_ptr SemanticState::getQueue(const string& name) const {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Nov 23 16:01:25 2011
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/broker/Consumer.h"
+#include "qpid/broker/Credit.h"
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/DeliveryAdapter.h"
 #include "qpid/broker/DeliveryRecord.h"
@@ -79,15 +80,12 @@ class SemanticState : private boost::non
         const bool ackExpected;
         const bool acquire;
         bool blocked;
-        bool windowing;
-        bool windowActive;
         bool exclusive;
         std::string resumeId;
         const std::string tag;  // <destination> from AMQP 0-10 
Message.subscribe command
         uint64_t resumeTtl;
         framing::FieldTable arguments;
-        uint32_t msgCredit;
-        uint32_t byteCredit;
+        Credit credit;
         bool notifyEnabled;
         const int syncFrequency;
         int deliveryCount;
@@ -131,12 +129,11 @@ class SemanticState : private boost::non
 
         bool doOutput();
 
+        Credit& getCredit() { return credit; }
+        const Credit& getCredit() const { return credit; }
         bool isAckExpected() const { return ackExpected; }
         bool isAcquire() const { return acquire; }
-        bool isWindowing() const { return windowing; }
         bool isExclusive() const { return exclusive; }
-        uint32_t getMsgCredit() const { return msgCredit; }
-        uint32_t getByteCredit() const { return byteCredit; }
         std::string getResumeId() const { return resumeId; };
         const std::string& getTag() const { return tag; }
         uint64_t getResumeTtl() const { return resumeTtl; }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Nov 23 16:01:25 2011
@@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1159329;
+const uint32_t Cluster::CLUSTER_VERSION = 1159330;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Nov 23 16:01:25 2011
@@ -404,11 +404,13 @@ void Connection::shadowSetUser(const std
     connection->setUserId(userId);
 }
 
-void Connection::consumerState(const string& name, bool blocked, bool 
notifyEnabled, const SequenceNumber& position)
+void Connection::consumerState(const string& name, bool blocked, bool 
notifyEnabled, const SequenceNumber& position,
+                               uint32_t usedMsgCredit, uint32_t usedByteCredit)
 {
     broker::SemanticState::ConsumerImpl::shared_ptr c = 
semanticState().find(name);
     c->position = position;
     c->setBlocked(blocked);
+    if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, 
usedByteCredit);
     if (notifyEnabled) c->enableNotify(); else c->disableNotify();
     updateIn.consumerNumbering.add(c);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Nov 23 16:01:25 2011
@@ -109,7 +109,8 @@ class Connection :
     // Called for data delivered from the cluster.
     void deliveredFrame(const EventFrame&);
 
-    void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled, const qpid::framing::SequenceNumber& position);
+    void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled, const qpid::framing::SequenceNumber& position,
+                       uint32_t usedMsgCredit, uint32_t usedByteCredit);
 
     // ==== Used in catch-up mode to build initial state.
     //

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Nov 23 16:01:25 
2011
@@ -535,14 +535,16 @@ void UpdateClient::updateConsumer(
         arg::resumeTtl   = ci->getResumeTtl(),
         arg::arguments   = ci->getArguments()
     );
-    shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? 
FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
-    shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, 
ci->getMsgCredit());
-    shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, 
ci->getByteCredit());
+    shadowSession.messageSetFlowMode(ci->getTag(), 
ci->getCredit().isWindowMode() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+    shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, 
ci->getCredit().allocated().messages);
+    shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, 
ci->getCredit().allocated().bytes);
     ClusterConnectionProxy(shadowSession).consumerState(
         ci->getTag(),
         ci->isBlocked(),
         ci->isNotifyEnabled(),
-        ci->position
+        ci->position,
+        ci->getCredit().used().messages,
+        ci->getCredit().used().bytes
     );
     consumerNumbering.add(ci.get());
 

Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Wed Nov 23 16:01:25 2011
@@ -354,6 +354,8 @@ QPID_AUTO_TEST_CASE(testCompleteOnAccept
     BOOST_CHECK(!q.get(m));
 
     s.accept(accepted);
+    //need to reallocate credit as we have flushed it all out
+    s.setFlowControl(FlowControl::messageWindow(chunk));
     fix.session.messageFlush(arg::destination=s.getName());
     accepted.clear();
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Nov 23 16:01:25 2011
@@ -176,6 +176,8 @@
       <field name="blocked" type="bit"/>
       <field name="notifyEnabled" type="bit"/>
       <field name="position" type="sequence-no"/>
+      <field name="used-msg-credit" type="uint32"/>
+      <field name="used-byte-credit" type="uint32"/>
     </control>
 
     <!-- Delivery-record for outgoing messages sent but not yet accepted. -->

Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Wed Nov 23 16:01:25 2011
@@ -187,6 +187,7 @@ class TestBase010(unittest.TestCase):
         self.conn = self.connect()
         self.session = self.conn.session("test-session", timeout=10)
         self.qmf = None
+        self.test_queue_name = self.id()
 
     def startQmf(self, handler=None):
         self.qmf = qmf.console.Session(handler)

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py?rev=1205467&r1=1205466&r2=1205467&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Wed Nov 23 
16:01:25 2011
@@ -611,6 +611,84 @@ class MessageTests(TestBase010):
             msg = q.get(timeout = 1)
             self.assertDataEquals(session, msg, "Message %d" % (i+6))
 
+    def test_credit_window_after_messagestop(self):
+        """
+        Tests that the broker's credit window size doesnt exceed the requested 
value when completing
+        previous messageTransfer commands after a message_stop and 
message_flow.
+        """
+
+        session = self.session
+
+        #create queue
+        session.queue_declare(queue = self.test_queue_name, exclusive=True, 
auto_delete=True)
+
+        #send 11 messages
+        for i in range(1, 12):
+            
session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name),
 "message-%d" % (i)))
+
+
+        #subscribe:
+        session.message_subscribe(queue=self.test_queue_name, destination="a")
+        a = session.incoming("a")
+        session.message_set_flow_mode(flow_mode = 1, destination = "a")
+        session.message_flow(unit = session.credit_unit.byte, value = 
0xFFFFFFFFL, destination = "a")
+        # issue 5 message credits
+        session.message_flow(unit = session.credit_unit.message, value = 5, 
destination = "a")
+
+        # get 5 messages
+        ids = RangedSet()
+        for i in range(1, 6):
+            msg = a.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+            ids.add(msg.id)
+
+        # now try and read a 6th message. we expect this to fail due to 
exhausted message credit.
+        try:
+            extra = a.get(timeout=1)
+            self.fail("Got unexpected message: " + extra.body)
+        except Empty: None
+
+        session.message_stop(destination = "a")
+
+        session.message_flow(unit = session.credit_unit.byte, value = 
0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 5, 
destination = "a")
+
+        # complete earlier messages after setting the window to 5 message 
credits
+        session.channel.session_completed(ids)
+
+        # Now continue to read the next 5 messages
+        for i in range(6, 11):
+            msg = a.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+
+        # now try and read the 11th message. we expect this to fail due to 
exhausted message credit.  If we receive an
+        # 11th this indicates the broker is not respecting the client's 
requested window size.
+        try:
+            extra = a.get(timeout=1)
+            self.fail("Got unexpected message: " + extra.body)
+        except Empty: None
+
+    def test_no_credit_wrap(self):
+        """
+        Ensure that adding credit does not result in wrapround, lowering the 
balance.
+        """
+        session = self.session
+
+        session.queue_declare(queue = self.test_queue_name, exclusive=True, 
auto_delete=True)
+        session.message_subscribe(queue=self.test_queue_name, destination="a")
+        a = session.incoming("a")
+        session.message_set_flow_mode(flow_mode = session.flow_mode.credit, 
destination = "a")
+        session.message_flow(unit = session.credit_unit.byte, value = 
0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 
0xFFFFFFFAL, destination = "a")
+        #test wraparound of credit balance does not occur
+        session.message_flow(unit = session.credit_unit.message, value = 10, 
destination = "a")
+        for i in range(1, 50):
+            
session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name),
 "message-%d" % (i)))
+        session.message_flush(destination = "a")
+        for i in range(1, 50):
+            msg = a.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+
 
     def test_subscribe_not_acquired(self):
         """



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to