Author: robbie
Date: Fri Nov 11 17:31:41 2011
New Revision: 1200979

URL: http://svn.apache.org/viewvc?rev=1200979&view=rev
Log:
QPID-3446: Unregister existing subscriptions when closing the connections 
[during shutdown], update lock usage in order to avoid deadlock.
    
Applied patch from Oleksandr Rudyy<oru...@gmail.com> and myself.
    
Merged from trunk r1198834

Modified:
    
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java

Modified: 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1200979&r1=1200978&r2=1200979&view=diff
==============================================================================
--- 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 (original)
+++ 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 Fri Nov 11 17:31:41 2011
@@ -460,7 +460,6 @@ public abstract class SubscriptionImpl i
     public void queueDeleted(AMQQueue queue)
     {
         _deleted.set(true);
-//        _channel.queueDeleted(queue);
     }
 
     public boolean filtersMessages()

Modified: 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1200979&r1=1200978&r2=1200979&view=diff
==============================================================================
--- 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
 (original)
+++ 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
 Fri Nov 11 17:31:41 2011
@@ -259,10 +259,11 @@ public class ServerConnection extends Co
 
     public void close(AMQConstant cause, String message) throws AMQException
     {
+        closeSubscriptions();
         ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
         try
         {
-               replyCode = ConnectionCloseCode.get(cause.getCode());
+            replyCode = ConnectionCloseCode.get(cause.getCode());
         }
         catch (IllegalArgumentException iae)
         {
@@ -399,4 +400,20 @@ public class ServerConnection extends Co
     {
         return _authorizedPrincipal.getName();
     }
+
+    @Override
+    public void closed()
+    {
+        closeSubscriptions();
+        super.closed();
+    }
+
+    private void closeSubscriptions()
+    {
+        for (Session ssn : getChannels())
+        {
+            ((ServerSession)ssn).unregisterSubscriptions();
+        }
+    }
+
 }

Modified: 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1200979&r1=1200978&r2=1200979&view=diff
==============================================================================
--- 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 (original)
+++ 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 Fri Nov 11 17:31:41 2011
@@ -415,19 +415,18 @@ public class ServerSession extends Sessi
             {
                 queue.unregisterSubscription(sub);
             }
-
         }
         catch (AMQException e)
         {
             // TODO
-            _logger.error("Failed to unregister subscription", e);
+            _logger.error("Failed to unregister subscription :" + 
e.getMessage(), e);
         }
         finally
         {
             sub.releaseSendLock();
         }
     }
-    
+
     public boolean isTransactional()
     {
         // this does not look great but there should only be one 
"non-transactional"
@@ -686,12 +685,17 @@ public class ServerSession extends Sessi
     {
         // unregister subscriptions in order to prevent sending of new messages
         // to subscriptions with closing session
+        unregisterSubscriptions();
+
+        super.close();
+    }
+
+    void unregisterSubscriptions()
+    {
         final Collection<Subscription_0_10> subscriptions = getSubscriptions();
         for (Subscription_0_10 subscription_0_10 : subscriptions)
         {
             unregister(subscription_0_10);
         }
-
-        super.close();
     }
 }

Modified: 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1200979&r1=1200978&r2=1200979&view=diff
==============================================================================
--- 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
 (original)
+++ 
qpid/branches/0.14/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
 Fri Nov 11 17:31:41 2011
@@ -1261,11 +1261,10 @@ public class ServerSessionDelegate exten
     {
         setThreadSubject(session);
 
-        for(Subscription_0_10 sub : getSubscriptions(session))
-        {
-            ((ServerSession)session).unregister(sub);
-        }
-        ((ServerSession)session).onClose();
+        ServerSession serverSession = (ServerSession)session;
+
+        serverSession.unregisterSubscriptions();
+        serverSession.onClose();
     }
 
     @Override
@@ -1274,11 +1273,6 @@ public class ServerSessionDelegate exten
         closed(session);
     }
 
-    public Collection<Subscription_0_10> getSubscriptions(Session session)
-    {
-        return ((ServerSession)session).getSubscriptions();
-    }
-
     private void setThreadSubject(Session session)
     {
         final ServerConnection scon = (ServerConnection) 
session.getConnection();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to