Author: aconway
Date: Tue Dec 22 14:22:17 2009
New Revision: 893175

URL: http://svn.apache.org/viewvc?rev=893175&view=rev
Log:
QPID-2296: Cluster errors when using acquire-mode-not-acquired

Replicate consumer's queue position to new cluster nodes.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=893175&r1=893174&r2=893175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Dec 22 14:22:17 2009
@@ -269,9 +269,10 @@
     return sessionState().getSemanticState();
 }
 
-void Connection::consumerState(const string& name, bool blocked, bool 
notifyEnabled)
+void Connection::consumerState(const string& name, bool blocked, bool 
notifyEnabled, const SequenceNumber& position)
 {
     broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
+    c.position = position;
     c.setBlocked(blocked);
     if (notifyEnabled) c.enableNotify(); else c.disableNotify();
     consumerNumbering.add(c.shared_from_this());

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=893175&r1=893174&r2=893175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Dec 22 14:22:17 2009
@@ -108,7 +108,7 @@
     // Called for data delivered from the cluster.
     void deliveredFrame(const EventFrame&);
 
-    void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled);
+    void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled, const qpid::framing::SequenceNumber& position);
     
     // ==== Used in catch-up mode to build initial state.
     // 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=893175&r1=893174&r2=893175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Dec 22 14:22:17 
2009
@@ -381,7 +381,8 @@
     ClusterConnectionProxy(shadowSession).consumerState(
         ci->getName(),
         ci->isBlocked(),
-        ci->isNotifyEnabled()
+        ci->isNotifyEnabled(),
+        ci->position
     );
     consumerNumbering.add(ci);
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=893175&r1=893174&r2=893175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Dec 22 14:22:17 2009
@@ -1149,6 +1149,43 @@
     BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m));
 }
 
+// Test that consumer positions are updated correctly.
+// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927
+//
+QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    SubscriptionSettings settings;
+    settings.autoAck = 0;
+    // Set the acquire mode to 'not-acquired' the consumer moves along the 
queue
+    // but does not acquire (remove) messages.
+    settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED;
+    Subscription s = c0.subs.subscribe(c0.lq, "q", settings);
+    c0.session.messageTransfer(arg::content=makeMessage("1", "q", 
durableFlag));
+    BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData());
+
+    // Add another member, send/receive another message and acquire
+    // the messages.  With the bug, this creates an inconsistency
+    // because the browse position was not updated to the new member.
+    cluster.add();
+    c0.session.messageTransfer(arg::content=makeMessage("2", "q", 
durableFlag));
+    BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData());
+    s.acquire(s.getUnacquired());
+    s.accept(s.getUnaccepted());
+
+    // In the bug we now have 0 messages on cluster[0] and 1 message on 
cluster[1]
+    // Subscribing on cluster[1] provokes an error that shuts down cluster[0]
+    Client c1(cluster[1], "c1");
+    Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1
+    Message m;
+    BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10));
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+}
 
 QPID_AUTO_TEST_SUITE_END()
 }} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=893175&r1=893174&r2=893175&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Dec 22 14:22:17 2009
@@ -137,6 +137,7 @@
       <field name="name" type="str8"/>
       <field name="blocked" type="bit"/>
       <field name="notifyEnabled" type="bit"/>
+      <field name="position" type="sequence-no"/>
     </control>
 
     <!-- Delivery-record for outgoing messages sent but not yet accepted. -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to