Author: aconway
Date: Wed Apr 18 14:59:31 2012
New Revision: 1327532

URL: http://svn.apache.org/viewvc?rev=1327532&view=rev
Log:
QPID-3352: Fix test for failed session to avoid confusion with as yet 
uninitialised session

Previously, Link was using sessionHandler::isReady() to determine if a
bridge had failed and needed recovery.  However this incorrectcly
recovers bridges that are not yet initialized as well as those that
have failed.

This was causing sporadic core dumps in serveral of the ha_tests.py
tests, in particular test_backup_acquired.

This patch adds a callback to notify the bridge when the session is
detached, and use this as the criteria for recovering a bridge.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    qpid/trunk/qpid/cpp/src/tests/run_federation_tests

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1327532&r1=1327531&r2=1327532&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Wed Apr 18 14:59:31 2012
@@ -62,7 +62,7 @@ Bridge::Bridge(Link* _link, framing::Cha
                InitializeCallback init) :
     link(_link), id(_id), args(_args), mgmtObject(0),
     listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), 
persistenceId(0),
-    initialize(init)
+    initialize(init), detached(false)
 {
     std::stringstream title;
     title << id << "_" << name;
@@ -85,11 +85,14 @@ Bridge::~Bridge()
 
 void Bridge::create(Connection& c)
 {
+    detached = false;           // Reset detached in case we are recovering.
     connState = &c;
     conn = &c;
     FieldTable options;
     if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
     SessionHandler& sessionHandler = c.getChannel(id);
+    sessionHandler.setDetachedCallback(
+        boost::bind(&Bridge::sessionDetached, shared_from_this()));
     if (args.i_srcIsLocal) {
         if (args.i_dynamic)
             throw Exception("Dynamic routing not supported for push routes");
@@ -179,12 +182,6 @@ void Bridge::destroy()
     listener(this);
 }
 
-bool Bridge::isSessionReady() const
-{
-    SessionHandler& sessionHandler = conn->getChannel(id);
-    return sessionHandler.ready();
-}
-
 void Bridge::setPersistenceId(uint64_t pId) const
 {
     persistenceId = pId;
@@ -336,4 +333,8 @@ const string& Bridge::getLocalTag() cons
     return link->getBroker()->getFederationTag();
 }
 
+void Bridge::sessionDetached() {
+    detached = true;
+}
+
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=1327532&r1=1327531&r2=1327532&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Wed Apr 18 14:59:31 2012
@@ -33,6 +33,7 @@
 #include "qmf/org/apache/qpid/broker/Bridge.h"
 
 #include <boost/function.hpp>
+#include <boost/enable_shared_from_this.hpp>
 #include <memory>
 
 namespace qpid {
@@ -44,7 +45,10 @@ class Link;
 class LinkRegistry;
 class SessionHandler;
 
-class Bridge : public PersistableConfig, public management::Manageable, public 
Exchange::DynamicBridge
+class Bridge : public PersistableConfig,
+               public management::Manageable,
+               public Exchange::DynamicBridge,
+               public boost::enable_shared_from_this<Bridge>
 {
 public:
     typedef boost::shared_ptr<Bridge> shared_ptr;
@@ -63,7 +67,7 @@ public:
     void destroy();
     bool isDurable() { return args.i_durable; }
 
-    bool isSessionReady() const;
+    bool isDetached() const { return detached; }
 
     management::ManagementObject* GetManagementObject() const;
     management::Manageable::status_t ManagementMethod(uint32_t methodId,
@@ -90,6 +94,9 @@ public:
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return 
args; }
 
 private:
+    // Callback when the bridge's session is detached.
+    void sessionDetached();
+
     struct PushHandler : framing::FrameHandler {
         PushHandler(Connection* c) { conn = c; }
         void handle(framing::AMQFrame& frame);
@@ -112,7 +119,7 @@ private:
     ConnectionState* connState;
     Connection* conn;
     InitializeCallback initialize;
-
+    bool detached;              // Set when session is detached.
     bool resetProxy();
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1327532&r1=1327531&r2=1327532&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Wed Apr 18 14:59:31 2012
@@ -312,7 +312,7 @@ void Link::ioThreadProcessing()
     // check for bridge session errors and recover
     if (!active.empty()) {
         Bridges::iterator removed = std::remove_if(
-            active.begin(), active.end(), 
!boost::bind(&Bridge::isSessionReady, _1));
+            active.begin(), active.end(), boost::bind(&Bridge::isDetached, 
_1));
         for (Bridges::iterator i = removed; i != active.end(); ++i) {
             Bridge::shared_ptr  bridge = *i;
             bridge->closed();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1327532&r1=1327531&r2=1327532&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Apr 18 14:59:31 
2012
@@ -64,6 +64,7 @@ void SessionHandler::handleDetach() {
     if (session.get())
         connection.getBroker().getSessionManager().detach(session);
     assert(!session.get());
+    if (detachedCallback) detachedCallback();
     connection.closeChannel(channel.get());
 }
 
@@ -117,4 +118,8 @@ void SessionHandler::attached(const std:
     }
 }
 
+void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
+    detachedCallback = cb;
+}
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=1327532&r1=1327531&r2=1327532&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Apr 18 14:59:31 
2012
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,6 +25,7 @@
 #include "qpid/amqp_0_10/SessionHandler.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
+#include <boost/function.hpp>
 
 namespace qpid {
 class SessionState;
@@ -61,7 +62,7 @@ class SessionHandler : public amqp_0_10:
      * This proxy is for sending such commands. In a clustered broker it will 
take steps
      * to synchronize command order across the cluster. In a stand-alone broker
      * it is just a synonym for getProxy()
-     */  
+     */
     framing::AMQP_ClientProxy& getClusterOrderProxy() {
         return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
     }
@@ -70,6 +71,8 @@ class SessionHandler : public amqp_0_10:
     void attached(const std::string& name);//used by 'pushing' inter-broker 
bridges
     void attachAs(const std::string& name);//used by 'pulling' inter-broker 
bridges
 
+    void setDetachedCallback(boost::function<void()> cb);
+
   protected:
     virtual void setState(const std::string& sessionName, bool force);
     virtual qpid::SessionState* getState();
@@ -91,6 +94,7 @@ class SessionHandler : public amqp_0_10:
     framing::AMQP_ClientProxy proxy;
     std::auto_ptr<SessionState> session;
     std::auto_ptr<SetChannelProxy> clusterOrderProxy;
+    boost::function<void ()> detachedCallback;
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/tests/run_federation_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_federation_tests?rev=1327532&r1=1327531&r2=1327532&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_federation_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_federation_tests Wed Apr 18 14:59:31 2012
@@ -33,16 +33,13 @@ else
     SKIPTESTS='-i *_xml'    # note: single quotes prevent expansion of *
 fi
 
+QPIDD_CMD="../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no 
--log-enable=info+ --log-enable=debug+:Bridge --log-to-file"
 start_brokers() {
-    ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
-    LOCAL_PORT=`cat qpidd.port`
-    ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
-    REMOTE_PORT=`cat qpidd.port`
-
-    ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
-    REMOTE_B1=`cat qpidd.port`
-    ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
-    REMOTE_B2=`cat qpidd.port`
+    rm -f fed_local.log fed_remote.log fed_b1.log fed_b2.log
+    LOCAL_PORT=$($QPIDD_CMD fed_local.log)
+    REMOTE_PORT=$($QPIDD_CMD fed_remote.log)
+    REMOTE_B1=$($QPIDD_CMD fed_b1.log)
+    REMOTE_B2=$($QPIDD_CMD fed_b2.log)
 }
 
 stop_brokers() {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to