Author: aconway
Date: Fri Feb 17 14:15:16 2012
New Revision: 1245542

URL: http://svn.apache.org/viewvc?rev=1245542&view=rev
Log:
QPID-3603: Reconnect URL in broker::Link

- Flatten known-hosts in Link to a single URL.
- Circular retry on failover URL.
- Allow setting a different retry URL.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py
    qpid/branches/qpid-3603-7/qpid/cpp/src/tests/reliable_replication_test
    qpid/branches/qpid-3603-7/qpid/tools/src/py/qpid-ha-tool

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp Fri Feb 17 
14:15:16 2012
@@ -64,6 +64,7 @@ Link::Link(LinkRegistry*  _links,
       visitCount(0),
       currentInterval(1),
       closing(false),
+      reconnectNext(0),         // Index of next address for reconnecting in 
url.
       channelCounter(1),
       connection(0),
       agent(0)
@@ -146,11 +147,23 @@ void Link::established ()
     }
 }
 
+void Link::setUrl(const Url& u) {
+    Mutex::ScopedLock mutex(lock);
+    url = u;
+    reconnectNext = 0;
+}
+
 void Link::opened() {
     Mutex::ScopedLock mutex(lock);
     assert(connection);
-    urls.reset(connection->getKnownHosts());
-    QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+    // Get default URL from known-hosts.
+    const std::vector<Url>& known = connection->getKnownHosts();
+    // Flatten vector of URLs into a single URL listing all addresses.
+    url.clear();
+    for(size_t i = 0; i < known.size(); ++i)
+        url.insert(url.end(), known[i].begin(), known[i].end());
+    reconnectNext = 0;
+    QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
 }
 
 void Link::closed(int, std::string text)
@@ -334,17 +347,16 @@ void Link::reconnect(const qpid::Address
     }
 }
 
-bool Link::tryFailover()
-{
-    Address next;
-    if (urls.next(next) &&
-        (next.host != host || next.port != port || next.protocol != 
transport)) {
+bool Link::tryFailover() {      // FIXME aconway 2012-01-30: lock held?
+    if (reconnectNext >= url.size()) reconnectNext = 0;
+    if (url.empty()) return false;
+    Address next = url[reconnectNext++];
+    if (next.host != host || next.port != port || next.protocol != transport) {
         links->changeAddress(Address(transport, host, port), next);
         QPID_LOG(debug, "Link failing over to " << host << ":" << port);
         return true;
-    } else {
-        return false;
     }
+    return false;
 }
 
 // Management updates for a linke are inconsistent in a cluster, so they are

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.h?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.h Fri Feb 17 
14:15:16 2012
@@ -23,10 +23,10 @@
  */
 
 #include <boost/shared_ptr.hpp>
+#include "qpid/Url.h"
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/PersistableConfig.h"
 #include "qpid/broker/Bridge.h"
-#include "qpid/broker/RetryList.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/management/Manageable.h"
@@ -60,7 +60,8 @@ namespace qpid {
             uint32_t visitCount;
             uint32_t currentInterval;
             bool     closing;
-            RetryList urls;
+            Url      url;       // URL can contain many addresses.
+            size_t   reconnectNext; // Index for next re-connect attempt
 
             typedef std::vector<Bridge::shared_ptr> Bridges;
             Bridges created;   // Bridges pending creation
@@ -111,6 +112,7 @@ namespace qpid {
             uint nextChannel();
             void add(Bridge::shared_ptr);
             void cancel(Bridge::shared_ptr);
+            void setUrl(const Url&); // Set URL for reconnection.
 
             void established(); // Called when connection is create
             void opened();      // Called when connection is open (after 
create)

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp Fri Feb 17 
14:15:16 2012
@@ -56,6 +56,7 @@ Backup::Backup(broker::Broker& b, const 
         s.mechanism, s.username, s.password);
     assert(result.second);  // FIXME aconway 2011-11-23: error handling
     link = result.first;
+    link->setUrl(Url(s.brokerUrl));
 
     replicator.reset(new BrokerReplicator(link));
     broker.getExchanges().registerExchange(replicator);
@@ -63,6 +64,11 @@ Backup::Backup(broker::Broker& b, const 
     broker.getConnectionObservers().add(excluder);
 }
 
+void Backup::setUrl(const Url& url) {
+    // FIXME aconway 2012-01-30: locking?
+    link->setUrl(url);
+}
+
 Backup::~Backup() {
     broker.getExchanges().destroy(replicator->getName());
     broker.getConnectionObservers().remove(excluder); // Allows client 
connections.

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h Fri Feb 17 14:15:16 
2012
@@ -49,6 +49,7 @@ class Backup
   public:
     Backup(broker::Broker&, const Settings&);
     ~Backup();
+    void setUrl(const Url&);
 
   private:
     broker::Broker& broker;

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Feb 17 
14:15:16 2012
@@ -55,14 +55,15 @@ const std::string BACKUP="backup";
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
     : broker(b),
       settings(s),
-      clientUrl(url(s.clientUrl, "ha-client-url")),
       brokerUrl(url(s.brokerUrl, "ha-broker-url")),
+      clientUrl(s.clientUrl.empty() ? brokerUrl : url(s.clientUrl, 
"ha-client-url")),
       backup(new Backup(b, s)),
       mgmtObject(0)
 {
     // Note all HA brokers start out in backup mode.
-    QPID_LOG(notice, "HA: Backup initialized: client-url=" << clientUrl
-             << " broker-url=" << brokerUrl);
+    QPID_LOG(notice, "HA: Backup initialized: "
+             << " broker-url=" << brokerUrl
+             << " client-url=" << clientUrl);
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
         boost::shared_ptr<ReplicatingSubscription::Factory>(
@@ -95,17 +96,15 @@ Manageable::status_t HaBroker::Managemen
           break;
       }
       case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
-          QPID_LOG(critical, "FIXME" << "before " <<  clientUrl)
           clientUrl = 
dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).i_clientAddresses;
-          QPID_LOG(critical, "FIXME" << "after " <<  clientUrl)
-          // FIXME aconway 2012-01-30: upate status for new URL
           mgmtObject->set_clientAddresses(clientUrl.str());
+          // FIXME aconway 2012-01-30: upate status for new URL
           break;
       }
       case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
           brokerUrl = 
dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args).i_brokerAddresses;
-          // FIXME aconway 2012-01-30: upate status for new URL
           mgmtObject->set_brokerAddresses(brokerUrl.str());
+          if (backup.get()) backup->setUrl(brokerUrl);
           break;
       }
       default:

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.h Fri Feb 17 
14:15:16 2012
@@ -57,7 +57,7 @@ class HaBroker : public management::Mana
     sys::Mutex lock;
     broker::Broker& broker;
     Settings settings;
-    Url clientUrl, brokerUrl;
+    Url brokerUrl, clientUrl;
     std::auto_ptr<Backup> backup;
     qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
 };

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py Fri Feb 17 
14:15:16 2012
@@ -76,7 +76,7 @@ def error_line(filename, n=1):
     except: return ""
     return ":\n" + "".join(result)
 
-def retry(function, timeout=10, delay=.01):
+def retry(function, timeout=1, delay=.01):
     """Call function until it returns True or timeout expires.
     Double the delay for each retry. Return True if function
     returns true, False if timeout expires."""
@@ -277,8 +277,8 @@ class Broker(Popen):
         self.find_log()
         cmd += ["--log-to-file", self.log]
         cmd += ["--log-to-stderr=no"]
-        if log_level != None:
-            cmd += ["--log-enable=%s" % log_level]
+        cmd += ["--log-enable=%s"%(log_level or "info+") ]
+
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
         if show_cmd: print cmd
@@ -526,7 +526,7 @@ class BrokerTest(TestCase):
         retry(test, timeout, delay)
         self.assertEqual(expect_contents, self.browse(session, queue, 0, 
transform=transform))
 
-def join(thread, timeout=10):
+def join(thread, timeout=1):
     thread.join(timeout)
     if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
 

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Fri Feb 17 
14:15:16 2012
@@ -27,18 +27,30 @@ from logging import getLogger, WARN, ERR
 
 log = getLogger("qpid.ha-tests")
 
+class HaBroker(Broker):
+    def __init__(self, test, args=[], broker_url=None, **kwargs):
+        assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+        Broker.__init__(self, test,
+                        args=["--load-module", BrokerTest.ha_lib,
+                              "--ha-enable=yes",
+                              "--ha-broker-url", broker_url ],
+                        **kwargs)
+
+    def promote(self):
+        assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0
+
+    def set_client_url(self, url):
+        assert os.system(
+            "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0
+
+    def set_broker_url(self, url):
+        assert os.system(
+            "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) 
== 0
+
+
 class ShortTests(BrokerTest):
     """Short HA functionality tests."""
 
-    def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", 
**kwargs):
-        assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
-        return Broker(self, args=["--load-module", BrokerTest.ha_lib,
-                                  "--ha-enable=yes",
-                                  "--ha-client-url", client_url,
-                                  "--ha-broker-url", broker_url,
-                                  ] + args,
-                      **kwargs)
-
     # FIXME aconway 2011-11-15: work around async configuration replication.
     # Wait for an address to become valid.
     def wait(self, session, address):
@@ -49,15 +61,19 @@ class ShortTests(BrokerTest):
             except NotFound: return False
         assert retry(check), "Timed out waiting for %s"%(address)
 
-    # FIXME aconway 2012-01-23: workaround: we need to give the
-    # backup a chance to attach to the queue.
+    # FIXME aconway 2012-01-23: work around async configuration replication.
+    # Wait for address to become valid on a backup broker.
     def wait_backup(self, backup, address):
         bs = self.connect_admin(backup).session()
         self.wait(bs, address)
         bs.connection.close()
 
-    def promote(self, broker):
-        os.system("qpid-ha-tool --promote %s"%(broker.host_port()))
+    # Combines wait_backup and assert_browse_retry
+    def assert_browse_backup(self, backup, queue, expected, **kwargs):
+        bs = self.connect_admin(backup).session()
+        self.wait(bs, queue)
+        self.assert_browse_retry(bs, queue, expected, **kwargs)
+        bs.connection.close()
 
     def assert_missing(self, session, address):
         try:
@@ -122,13 +138,13 @@ class ShortTests(BrokerTest):
             b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
             self.assert_browse_retry(b, prefix+"q4", ["6","7"])
 
-        primary = self.ha_broker(name="primary")
-        self.promote(primary)
+        primary = HaBroker(self, name="primary")
+        primary.promote()
         p = primary.connect().session()
 
         # Create config, send messages before starting the backup, to test 
catch-up replication.
         setup(p, "1", primary)
-        backup  = self.ha_broker(name="backup", broker_url=primary.host_port())
+        backup  = HaBroker(self, name="backup", broker_url=primary.host_port())
         # Create config, send messages after starting the backup, to test 
steady-state replication.
         setup(p, "2", primary)
 
@@ -165,16 +181,16 @@ class ShortTests(BrokerTest):
     def test_sync(self):
         def queue(name, replicate):
             return "%s;{create:always,%s}"%(name, 
self.qpid_replicate(replicate))
-        primary = self.ha_broker(name="primary")
-        self.promote(primary)
+        primary = HaBroker(self, name="primary")
+        primary.promote()
         p = primary.connect().session()
         s = p.sender(queue("q","messages"))
         for m in [str(i) for i in range(0,10)]: s.send(m)
         s.sync()
-        backup1 = self.ha_broker(name="backup1", 
broker_url=primary.host_port())
+        backup1 = HaBroker(self, name="backup1", 
broker_url=primary.host_port())
         for m in [str(i) for i in range(10,20)]: s.send(m)
         s.sync()
-        backup2 = self.ha_broker(name="backup2", 
broker_url=primary.host_port())
+        backup2 = HaBroker(self, name="backup2", 
broker_url=primary.host_port())
         for m in [str(i) for i in range(20,30)]: s.send(m)
         s.sync()
 
@@ -188,10 +204,10 @@ class ShortTests(BrokerTest):
 
     def test_send_receive(self):
         """Verify sequence numbers of messages sent by qpid-send"""
-        primary = self.ha_broker(name="primary")
-        self.promote(primary)
-        backup1 = self.ha_broker(name="backup1", 
broker_url=primary.host_port())
-        backup2 = self.ha_broker(name="backup2", 
broker_url=primary.host_port())
+        primary = HaBroker(self, name="primary")
+        primary.promote()
+        backup1 = HaBroker(self, name="backup1", 
broker_url=primary.host_port())
+        backup2 = HaBroker(self, name="backup2", 
broker_url=primary.host_port())
         sender = self.popen(
             ["qpid-send",
              "--broker", primary.host_port(),
@@ -222,9 +238,9 @@ class ShortTests(BrokerTest):
     def test_failover(self):
         """Verify that backups rejects connections and that fail-over works in 
python client"""
         getLogger().setLevel(ERROR) # Disable WARNING log messages due to 
failover
-        primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
-        self.promote(primary)
-        backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+        primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+        primary.promote()
+        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
         # Check that backup rejects normal connections
         try:
             backup.connect()
@@ -241,14 +257,14 @@ class ShortTests(BrokerTest):
         sender.send("foo")
         primary.kill()
         assert retry(lambda: not is_running(primary.pid))
-        self.promote(backup)
+        backup.promote()
         self.assert_browse_retry(s, "q", ["foo"])
         c.close()
 
     def test_failover_cpp(self):
-        primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
-        self.promote(primary)
-        backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+        primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+        primary.promote()
+        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
         url="%s,%s"%(primary.host_port(), backup.host_port())
         
primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
         self.wait_backup(backup, "q")
@@ -262,7 +278,7 @@ class ShortTests(BrokerTest):
 
         primary.kill()
         assert retry(lambda: not is_running(primary.pid)) # Wait for primary 
to die
-        self.promote(backup)
+        backup.promote()
         n = receiver.received       # Make sure we are still running
         assert retry(lambda: receiver.received > n + 10)
         sender.stop()

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/reliable_replication_test
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/reliable_replication_test?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/reliable_replication_test 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/reliable_replication_test Fri 
Feb 17 14:15:16 2012
@@ -65,12 +65,9 @@ receive() {
 }
 
 bounce_link() {
-    echo "Destroying link..."
     $PYTHON_COMMANDS/qpid-route link del "localhost:$BROKER_B" 
"localhost:$BROKER_A"
-    echo "Link destroyed; recreating route..."
-    sleep 2
+#    sleep 2
     $PYTHON_COMMANDS/qpid-route --ack 500 queue add "localhost:$BROKER_B" 
"localhost:$BROKER_A" replication replication
-    echo "Route re-established"
 }
 
 if test -d ${PYTHON_DIR} && test -e $REPLICATING_LISTENER_LIB && test -e 
$REPLICATION_EXCHANGE_LIB ;  then
@@ -78,16 +75,11 @@ if test -d ${PYTHON_DIR} && test -e $REP
     for i in `seq 1 100000`; do echo Message $i; done > replicated.expected
     send &
     receive &
-    for i in `seq 1 5`; do sleep 10; bounce_link; done;
+    for i in `seq 1 3`; do sleep 1; bounce_link; done;
     wait
     #check that received list is identical to sent list
-    diff replicated.actual replicated.expected || FAIL=1
-    if [[ $FAIL ]]; then
-        echo reliable replication test failed: expectations not met!
-        exit 1
-    else 
-        echo replication reliable in the face of link failures
-        rm -f replication.actual replication.expected replication-source.log 
replication-dest.log qpidd-repl.port
-    fi
+    diff replicated.actual replicated.expected || exit 1
+    rm -f replication.actual replication.expected replication-source.log 
replication-dest.log qpidd-repl.port
+    true
 fi
 

Modified: qpid/branches/qpid-3603-7/qpid/tools/src/py/qpid-ha-tool
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/tools/src/py/qpid-ha-tool?rev=1245542&r1=1245541&r2=1245542&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/tools/src/py/qpid-ha-tool (original)
+++ qpid/branches/qpid-3603-7/qpid/tools/src/py/qpid-ha-tool Fri Feb 17 
14:15:16 2012
@@ -34,38 +34,43 @@ op.add_option("-q", "--query", action="s
              help="Show the current HA settings on the broker.")
 
 class HaBroker:
-    def __init__(self, broker):
-        self.session = qmf.console.Session()
-       self.qmf_broker = self.session.addBroker(broker, 
client_properties={"qpid.ha-admin":1})
-       ha_brokers = self.session.getObjects(_class="habroker", 
_package="org.apache.qpid.ha")
+    def __init__(self, session, broker):
+        self.session = session
+       self.qmf_broker = self.session.addBroker(
+            broker, client_properties={"qpid.ha-admin":1})
+       ha_brokers = self.session.getObjects(
+            _class="habroker", _package="org.apache.qpid.ha")
        if (not ha_brokers): raise Exception("Broker does not have HA enabled.")
        self.ha_broker = ha_brokers[0]
 
     def query(self):
        self.ha_broker.update()
        print "status=", self.ha_broker.status
-       print "client-addresses=", self.ha_broker.clientAddresses
        print "broker-addresses=", self.ha_broker.brokerAddresses
+       print "client-addresses=", self.ha_broker.clientAddresses
 
 def main(argv):
     try:
        opts, args = op.parse_args(argv)
        if len(args) >1: broker = args[1]
        else: broker = "localhost:5672"
-       hb = HaBroker(broker)
+        session = qmf.console.Session()
        try:
+            hb = HaBroker(session, broker)
            action=False
-           if opts.promote: hb.ha_broker.promote(); action=True
-           if opts.client_addresses: 
hb.ha_broker.setClientAddresses(opts.client_addresses); action=True
-           if opts.broker_addresses: 
hb.ha_broker.setBrokerAddresses(opts.broker_addresses); action=True
+           if opts.promote:
+                hb.ha_broker.promote(); action=True
+           if opts.broker_addresses:
+                hb.ha_broker.setBrokerAddresses(opts.broker_addresses); 
action=True
+           if opts.client_addresses:
+                hb.ha_broker.setClientAddresses(opts.client_addresses); 
action=True
            if opts.query or not action: hb.query()
            return 0
        finally:
-           hb.session.close()          # Avoid errors shutting down threads.
+           session.close()      # Avoid errors shutting down threads.
     except Exception, e:
-       raise                           # FIXME aconway 2012-01-30:
         print e
-        return -1
+        return 1
 
 if __name__ == "__main__":
     sys.exit(main(sys.argv))



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to