Author: rgodfrey Date: Tue Feb 21 19:38:11 2012 New Revision: 1291964 URL: http://svn.apache.org/viewvc?rev=1291964&view=rev Log: QPID-3595 : Python Alternate Exchange tests fail against the Java Broker
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java qpid/trunk/qpid/java/test-profiles/python_tests/Java010PythonExcludes Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1291964&r1=1291963&r2=1291964&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Tue Feb 21 19:38:11 2012 @@ -231,23 +231,26 @@ public class ServerConnectionDelegate ex @Override public void sessionDetach(Connection conn, SessionDetach dtc) { - // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures - // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister + // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures + // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the stop // completes. - unregisterAllSubscriptions(conn, dtc); + stopAllSubscriptions(conn, dtc); + Session ssn = conn.getSession(dtc.getChannel()); + ((ServerSession)ssn).setClose(true); super.sessionDetach(conn, dtc); } - private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc) + private void stopAllSubscriptions(Connection conn, SessionDetach dtc) { final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subs) { - ssn.unregister(subscription_0_10); + subscription_0_10.stop(); } } + @Override public void sessionAttach(final Connection conn, final SessionAttach atc) { @@ -305,4 +308,4 @@ public class ServerConnectionDelegate ex { return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.VERSION_0_10); } -} \ No newline at end of file +} Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1291964&r1=1291963&r2=1291964&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Tue Feb 21 19:38:11 2012 @@ -704,7 +704,7 @@ public class ServerSession extends Sessi { if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty()) { - if(_blocking.compareAndSet(true,false)) + if(_blocking.compareAndSet(true,false) && !isClosing()) { _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); @@ -759,6 +759,16 @@ public class ServerSession extends Sessi } } + void stopSubscriptions() + { + final Collection<Subscription_0_10> subscriptions = getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subscriptions) + { + subscription_0_10.stop(); + } + } + + public void receivedComplete() { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); @@ -900,6 +910,12 @@ public class ServerSession extends Sessi } } + + protected void setClose(boolean close) + { + super.setClose(close); + } + @Override public int compareTo(AMQSessionModel session) { Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1291964&r1=1291963&r2=1291964&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Tue Feb 21 19:38:11 2012 @@ -533,7 +533,18 @@ public class ServerSessionDelegate exten { if(!exchange.getTypeShortString().toString().equals(method.getType())) { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() +"."); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare exchange: " + exchangeName + + " of type " + exchange.getTypeShortString() + + " to " + method.getType() +"."); + } + else if(method.hasAlternateExchange() + && !(method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to change alternate exchange of: " + exchangeName + + " from " + exchange.getAlternateExchange() + + " to " + method.getAlternateExchange() +"."); } } @@ -1302,8 +1313,9 @@ public class ServerSessionDelegate exten ServerSession serverSession = (ServerSession)session; - serverSession.unregisterSubscriptions(); + serverSession.stopSubscriptions(); serverSession.onClose(); + serverSession.unregisterSubscriptions(); } @Override Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1291964&r1=1291963&r2=1291964&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Tue Feb 21 19:38:11 2012 @@ -161,7 +161,7 @@ public class Session extends SessionInvo this.expiry = expiry; } - void setClose(boolean close) + protected void setClose(boolean close) { this.closing = close; } Modified: qpid/trunk/qpid/java/test-profiles/python_tests/Java010PythonExcludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/python_tests/Java010PythonExcludes?rev=1291964&r1=1291963&r2=1291964&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/python_tests/Java010PythonExcludes (original) +++ qpid/trunk/qpid/java/test-profiles/python_tests/Java010PythonExcludes Tue Feb 21 19:38:11 2012 @@ -64,8 +64,6 @@ qpid_tests.broker_0_10.message.MessageTe qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward #QPID-3595 Alternate Exchanges support requires work to be spec compliant. -qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_modify_existing_exchange_alternate -qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_autodelete qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete_no_match qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_reject_no_match qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_add_alternate_to_exchange --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org