Author: robbie Date: Mon Nov 7 17:40:03 2011 New Revision: 1198834 URL: http://svn.apache.org/viewvc?rev=1198834&view=rev Log: QPID-3446: Unregister existing subscriptions when closing the connections [during shutdown], update lock usage in order to avoid deadlock.
Applied patch from Oleksandr Rudyy<oru...@gmail.com> and myself. Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1198834&r1=1198833&r2=1198834&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Nov 7 17:40:03 2011 @@ -460,7 +460,6 @@ public abstract class SubscriptionImpl i public void queueDeleted(AMQQueue queue) { _deleted.set(true); -// _channel.queueDeleted(queue); } public boolean filtersMessages() Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1198834&r1=1198833&r2=1198834&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Nov 7 17:40:03 2011 @@ -259,10 +259,11 @@ public class ServerConnection extends Co public void close(AMQConstant cause, String message) throws AMQException { + closeSubscriptions(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; try { - replyCode = ConnectionCloseCode.get(cause.getCode()); + replyCode = ConnectionCloseCode.get(cause.getCode()); } catch (IllegalArgumentException iae) { @@ -399,4 +400,20 @@ public class ServerConnection extends Co { return _authorizedPrincipal.getName(); } + + @Override + public void closed() + { + closeSubscriptions(); + super.closed(); + } + + private void closeSubscriptions() + { + for (Session ssn : getChannels()) + { + ((ServerSession)ssn).unregisterSubscriptions(); + } + } + } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1198834&r1=1198833&r2=1198834&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Nov 7 17:40:03 2011 @@ -415,19 +415,18 @@ public class ServerSession extends Sessi { queue.unregisterSubscription(sub); } - } catch (AMQException e) { // TODO - _logger.error("Failed to unregister subscription", e); + _logger.error("Failed to unregister subscription :" + e.getMessage(), e); } finally { sub.releaseSendLock(); } } - + public boolean isTransactional() { // this does not look great but there should only be one "non-transactional" @@ -686,12 +685,17 @@ public class ServerSession extends Sessi { // unregister subscriptions in order to prevent sending of new messages // to subscriptions with closing session + unregisterSubscriptions(); + + super.close(); + } + + void unregisterSubscriptions() + { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } - - super.close(); } } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1198834&r1=1198833&r2=1198834&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Nov 7 17:40:03 2011 @@ -1261,11 +1261,10 @@ public class ServerSessionDelegate exten { setThreadSubject(session); - for(Subscription_0_10 sub : getSubscriptions(session)) - { - ((ServerSession)session).unregister(sub); - } - ((ServerSession)session).onClose(); + ServerSession serverSession = (ServerSession)session; + + serverSession.unregisterSubscriptions(); + serverSession.onClose(); } @Override @@ -1274,11 +1273,6 @@ public class ServerSessionDelegate exten closed(session); } - public Collection<Subscription_0_10> getSubscriptions(Session session) - { - return ((ServerSession)session).getSubscriptions(); - } - private void setThreadSubject(Session session) { final ServerConnection scon = (ServerConnection) session.getConnection(); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org