Author: aconway
Date: Wed May 16 17:19:48 2012
New Revision: 1339268

URL: http://svn.apache.org/viewvc?rev=1339268&view=rev
Log:
QPID-3603: HA don't replicate excluseive, auto-delete, non-timeout queues.

Such queues don't need to be replicated because they are destroyed when
as the owning session disconnects, so won't survive a failover.

This eliminsates managment subscriptio queues from replication.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1339268&r1=1339267&r2=1339268&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed May 16 17:19:48 
2012
@@ -73,6 +73,8 @@ const string ARGS("args");
 const string ARGUMENTS("arguments");
 const string AUTODEL("autoDel");
 const string AUTODELETE("autoDelete");
+const string EXCL("excl");
+const string EXCLUSIVE("exclusive");
 const string BIND("bind");
 const string UNBIND("unbind");
 const string BINDING("binding");
@@ -278,7 +280,9 @@ void BrokerReplicator::route(Deliverable
 void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
     string name = values[QNAME].asString();
     Variant::Map argsMap = asMapVoid(values[ARGS]);
-    if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated queue.
+    if (!isReplicated(
+            values[ARGS].asMap(), values[AUTODEL].asBool(), 
values[EXCL].asBool()))
+        return;
     if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) {
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
@@ -286,19 +290,20 @@ void BrokerReplicator::doEventQueueDecla
         // The queue was definitely created on the primary.
         if (broker.getQueues().find(name)) {
             broker.getQueues().destroy(name);
-            QPID_LOG(warning, logPrefix << "queue declare event, replaced 
exsiting: " << name);
+            QPID_LOG(warning, logPrefix << "queue declare event, replaced 
exsiting: "
+                     << name);
         }
         std::pair<boost::shared_ptr<Queue>, bool> result =
             broker.createQueue(
                 name,
                 values[DURABLE].asBool(),
                 values[AUTODEL].asBool(),
-                0 /*i.e. no owner regardless of exclusivity on master*/,
+                0, // no owner regardless of exclusivity on primary
                 values[ALTEX].asString(),
                 args,
                 values[USER].asString(),
                 values[RHOST].asString());
-        assert(result.second);
+        assert(result.second);  // Should be true since we destroyed existing 
queue above
         QPID_LOG(debug, logPrefix << "queue declare event: " << name);
         startQueueReplicator(result.first);
     }
@@ -420,7 +425,10 @@ void BrokerReplicator::doEventUnbind(Var
 
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
-    if (!haBroker.replicateLevel(argsMap)) return;
+    if (!isReplicated(values[ARGUMENTS].asMap(),
+                      values[AUTODELETE].asBool(),
+                      values[EXCLUSIVE].asBool()))
+        return;
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
     string name(values[NAME].asString());
@@ -522,6 +530,17 @@ void BrokerReplicator::doResponseHaBroke
     }
 }
 
+namespace {
+const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
+}
+
+bool BrokerReplicator::isReplicated(
+    const Variant::Map& args, bool autodelete, bool exclusive)
+{
+    bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == 
args.end();
+    return haBroker.replicateLevel(args) && !ignore;
+}
+
 void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& 
queue)
 {
     if (haBroker.replicateLevel(queue->getSettings()) == ALL) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1339268&r1=1339267&r2=1339268&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Wed May 16 17:19:48 2012
@@ -88,7 +88,8 @@ class BrokerReplicator : public broker::
     void doResponseHaBroker(types::Variant::Map& values);
 
     QueueReplicatorPtr findQueueReplicator(const std::string& qname);
-    void  startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+    void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+    bool isReplicated(const types::Variant::Map& args, bool autodelete, bool 
exclusive);
     void ready();
 
     LogPrefix logPrefix;

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1339268&r1=1339267&r2=1339268&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed May 16 17:19:48 2012
@@ -171,12 +171,12 @@ def wait_address(session, address):
         except NotFound: return False
     assert retry(check), "Timed out waiting for address %s"%(address)
 
-def assert_missing(session, address):
-    """Assert that the address is _not_ valid"""
+def valid_address(session, address):
+    """Test if an address is valid"""
     try:
         session.receiver(address)
-        self.fail("Expected NotFound: %s"%(address))
-    except NotFound: pass
+        return True
+    except NotFound: return False
 
 class ReplicationTests(BrokerTest):
     """Correctness tests for  HA replication."""
@@ -223,7 +223,7 @@ class ReplicationTests(BrokerTest):
             self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
 
             self.assert_browse_retry(b, prefix+"q2", []) # configuration only
-            assert_missing(b, prefix+"q3")
+            assert not valid_address(b, prefix+"q3")
             b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds 
with replicate=all
             self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
             b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds 
with replicate=configuration
@@ -603,9 +603,26 @@ class ReplicationTests(BrokerTest):
         test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
         test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
 
+    def test_auto_delete_exclusive(self):
+        """Verify that we ignore auto-delete, exclusive, 
non-auto-delete-timeout queues"""
+        cluster = HaCluster(self,2)
+        s = cluster[0].connect().session()
+        
s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+        s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+        s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+        
s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+        s.receiver("q;{create:always}")
+
+        s = cluster[1].connect_admin().session()
+        cluster[1].wait_backup("q")
+        assert not valid_address(s, "exad")
+        assert valid_address(s, "ex")
+        assert valid_address(s, "ad")
+        assert valid_address(s, "time")
+
     def test_recovering(self):
         """Verify that the primary broker does not go active until expected
-        backups have connected or timeout expires."""
+        backups have connected"""
         cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"])
         c = cluster[0].connect()
         for i in xrange(10):
@@ -691,13 +708,14 @@ class LongTests(BrokerTest):
                 receiver.receiver.assert_running()
                 n = receiver.received
                 # FIXME aconway 2012-05-01: don't kill primary till it's active
-                # otherwise we can lose messages. This is in lieu of not
-                # promoting catchup brokers.
+                # otherwise we can lose messages. When we implement 
non-promotion
+                # of catchup brokers we can make this stronger: wait only for
+                # there to be at least one ready backup.
                 assert retry(brokers[i%3].try_connect, 1)
                 brokers.bounce(i%3)
                 i += 1
                 def enough():        # Verify we're still running
-                    receiver.check()        # Verify no exceptions
+                    receiver.check() # Verify no exceptions
                     return receiver.received > n + 100
                 assert retry(enough, 1)
         except:



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to