Author: robbie Date: Mon Sep 10 12:39:44 2012 New Revision: 1382799 URL: http://svn.apache.org/viewvc?rev=1382799&view=rev Log: QPID-4289: Fix 0-8/0-9/0-9-1 failover issues
Applied patch from Philip Harvey <p...@philharveyonline.com> and Oleksandr Rudyy <oru...@gmail.com> Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java qpid/trunk/qpid/java/test-profiles/CPPExcludes Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java (original) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java Mon Sep 10 12:39:44 2012 @@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQConnect import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestUtils; import com.sleepycat.je.rep.ReplicationConfig; @@ -134,7 +135,10 @@ public class HAClusterBlackboxTest exten public void assertFailoverOccurs(long delay) throws InterruptedException { - _failoverLatch.await(delay, TimeUnit.MILLISECONDS); + if (!_failoverLatch.await(delay, TimeUnit.MILLISECONDS)) + { + LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n"); + } assertEquals("Failover did not occur", 0, _failoverLatch.getCount()); } Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java (original) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java Mon Sep 10 12:39:44 2012 @@ -204,7 +204,7 @@ public class HATestClusterCreator public void stopNode(final int brokerPortNumber) { - _testcase.stopBroker(brokerPortNumber); + _testcase.killBroker(brokerPortNumber); } public void stopCluster() throws Exception Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Sep 10 12:39:44 2012 @@ -1080,7 +1080,7 @@ public class AMQConnection extends Close return _started; } - protected final boolean isConnected() + public final boolean isConnected() { return _connected; } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Sep 10 12:39:44 2012 @@ -90,12 +90,13 @@ public class AMQConnectionDelegate_8_0 i public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException { + if (_logger.isDebugEnabled()) + { + _logger.debug("Connecting to broker:" + brokerDetail); + } final Set<AMQState> openOrClosedStates = EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); - - StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); - ConnectionSettings settings = brokerDetail.buildConnectionSettings(); settings.setProtocol(brokerDetail.getTransport()); @@ -126,6 +127,8 @@ public class AMQConnectionDelegate_8_0 i OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext); _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); + + StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); _conn.getProtocolHandler().getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Mon Sep 10 12:39:44 2012 @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.handler; +import java.nio.ByteBuffer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,7 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.Sender; public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> { @@ -91,18 +94,15 @@ public class ConnectionCloseMethodHandle } finally { + Sender<ByteBuffer> sender = session.getSender(); if (error != null) { session.notifyError(error); - } - - // Close the protocol Session, including any open TCP connections - session.closeProtocolSession(); + } - // Closing the session should not introduce a race condition as this thread will continue to propgate any - // exception in to the exceptionCaught method of the SessionHandler. - // Any sessionClosed event should occur after this. + // Close the open TCP connection + sender.close(); } } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Sep 10 12:39:44 2012 @@ -67,6 +67,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the @@ -210,48 +211,67 @@ public class AMQProtocolHandler implemen } else { - _logger.debug("Session closed called with failover state currently " + _failoverState); - - // reconnetablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. - - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) - { - _logger.debug("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - startFailoverThread(); - } - else - { - _logger.debug("Not starting failover as state currently " + _failoverState); - } - } - else + // Use local variable to keep flag whether fail-over allowed or not, + // in order to execute AMQConnection#exceptionRecievedout out of synchronization block, + // otherwise it might deadlock with failover mutex + boolean failoverNotAllowed = false; + synchronized (this) { - _logger.debug("Failover not allowed by policy."); // or already in progress? - if (_logger.isDebugEnabled()) { - _logger.debug(_connection.getFailoverPolicy().toString()); + _logger.debug("Session closed called with failover state " + _failoverState); } - if (_failoverState != FailoverState.IN_PROGRESS) + // reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + if (_failoverState == FailoverState.NOT_STARTED) { - _logger.debug("sessionClose() not allowed to failover"); - _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", - _stateManager.getLastException())); + // close the sender + try + { + _sender.close(); + } + catch (Exception e) + { + _logger.warn("Exception occured on closing the sender", e); + } + if (_connection.failoverAllowed()) + { + _failoverState = FailoverState.IN_PROGRESS; + + _logger.debug("FAILOVER STARTING"); + startFailoverThread(); + } + else if (_connection.isConnected()) + { + failoverNotAllowed = true; + if (_logger.isDebugEnabled()) + { + _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy()); + } + } + else + { + _logger.debug("We are in process of establishing the initial connection"); + } } else { - _logger.debug("sessionClose() failover in progress"); + _logger.debug("Not starting the failover thread as state currently " + _failoverState); } } + + if (failoverNotAllowed) + { + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection not permitted.", _stateManager.getLastException())); + } } - _logger.debug("Protocol Session [" + this + "] closed"); + if (_logger.isDebugEnabled()) + { + _logger.debug("Protocol Session [" + this + "] closed"); + } } /** See {@link FailoverHandler} to see rationale for separate thread. */ @@ -297,14 +317,17 @@ public class AMQProtocolHandler implemen */ public void exception(Throwable cause) { - if (_failoverState == FailoverState.NOT_STARTED) + boolean connectionClosed = (cause instanceof AMQConnectionClosedException || cause instanceof IOException); + if (connectionClosed) { - if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) + _network.close(); + } + FailoverState state = getFailoverState(); + if (state == FailoverState.NOT_STARTED) + { + if (connectionClosed) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attempt failover - _network.close(); - closed(); } else { @@ -319,7 +342,7 @@ public class AMQProtocolHandler implemen } // we reach this point if failover was attempted and failed therefore we need to let the calling app // know since we cannot recover the situation - else if (_failoverState == FailoverState.FAILED) + else if (state == FailoverState.FAILED) { _logger.error("Exception caught by protocol handler: " + cause, cause); @@ -329,6 +352,10 @@ public class AMQProtocolHandler implemen propagateExceptionToAllWaiters(amqe); _connection.exceptionReceived(cause); } + else + { + _logger.warn("Exception caught by protocol handler: " + cause, cause); + } } /** @@ -792,14 +819,14 @@ public class AMQProtocolHandler implemen return _protocolSession; } - FailoverState getFailoverState() + synchronized FailoverState getFailoverState() { return _failoverState; } - public void setFailoverState(FailoverState failoverState) + public synchronized void setFailoverState(FailoverState failoverState) { - _failoverState = failoverState; + _failoverState= failoverState; } public byte getProtocolMajorVersion() @@ -843,6 +870,11 @@ public class AMQProtocolHandler implemen _sender = sender; } + protected Sender<ByteBuffer> getSender() + { + return _sender; + } + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Sep 10 12:39:44 2012 @@ -48,6 +48,8 @@ import org.apache.qpid.transport.Transpo import javax.jms.JMSException; import javax.security.sasl.SaslClient; + +import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -372,6 +374,11 @@ public class AMQProtocolSession implemen } } + public Sender<ByteBuffer> getSender() + { + return _protocolHandler.getSender(); + } + public void failover(String host, int port) { _protocolHandler.failover(host, port); Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Mon Sep 10 12:39:44 2012 @@ -1313,7 +1313,7 @@ public class FailoverBehaviourTest exten * @param acknowledgeMode session acknowledge mode * @throws JMSException */ - private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception { initDelayedFailover(acknowledgeMode); @@ -1324,9 +1324,14 @@ public class FailoverBehaviourTest exten failBroker(getFailingPort()); + // wait until failover is started + _failoverStarted.await(5, TimeUnit.SECONDS); + // test whether session#close blocks while failover is in progress _consumerSession.close(); + assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS)); + assertFailoverException(); } @@ -1360,10 +1365,8 @@ public class FailoverBehaviourTest exten * @param acknowledgeMode session acknowledge mode * @throws JMSException */ - private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException + private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception { - setDelayedFailoverPolicy(); - QueueBrowser browser = prepareQueueBrowser(acknowledgeMode); @SuppressWarnings("unchecked") @@ -1373,8 +1376,13 @@ public class FailoverBehaviourTest exten failBroker(getFailingPort()); + // wait until failover is started + _failoverStarted.await(5, TimeUnit.SECONDS); + browser.close(); + assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS)); + assertFailoverException(); } @@ -1402,5 +1410,11 @@ public class FailoverBehaviourTest exten ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy); return failoverPolicy; } - + + @Override + public void failBroker(int port) + { + killBroker(port); + } + } Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java?rev=1382799&view=auto ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java (added) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java Mon Sep 10 12:39:44 2012 @@ -0,0 +1,255 @@ +package org.apache.qpid.client.failover; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQBrokerDetails; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestUtils; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.util.FileUtils; + +public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements ConnectionListener +{ + private static final Logger _logger = Logger.getLogger(MultipleBrokersFailoverTest.class); + + private static final String FAILOVER_VIRTUAL_HOST = "failover"; + private static final String NON_FAILOVER_VIRTUAL_HOST = "nonfailover"; + private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'"; + private static final int FAILOVER_RETRIES = 1; + private static final int FAILOVER_CONNECTDELAY = 1000; + private int[] _brokerPorts; + private AMQConnectionURL _connectionURL; + private Connection _connection; + private CountDownLatch _failoverComplete; + private CountDownLatch _failoverStarted; + private Session _consumerSession; + private Destination _destination; + private MessageConsumer _consumer; + private Session _producerSession; + private MessageProducer _producer; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + int port = findFreePort(); + _brokerPorts = new int[4]; + _connectionURL = new AMQConnectionURL("amqp://guest:guest@test/" + FAILOVER_VIRTUAL_HOST + + "?&failover='roundrobin?cyclecount='1''"); + + // we need to create 4 brokers: + // 1st broker will be running in test JVM and will not have failover host (only tcp connection will established, amqp connection will be closed) + // 2d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established) + // 3d broker will be spawn in separate JVM and should not have a failover host (only tcp connection will established, amqp connection will be closed) + // 4d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established) + + // the test should connect to the second broker first and fail over to the forth broker + // after unsuccessful try to establish the connection to the 3d broker + for (int i = 0; i < _brokerPorts.length; i++) + { + if (i > 0) + { + port = getNextAvailable(port + 1); + } + _brokerPorts[i] = port; + + XMLConfiguration testConfiguration = new XMLConfiguration(); + testConfiguration.addProperty("management.enabled", "false"); + + XMLConfiguration testVirtualhosts = new XMLConfiguration(); + String host = null; + if (i == 1 || i == _brokerPorts.length - 1) + { + host = FAILOVER_VIRTUAL_HOST; + } + else + { + host = NON_FAILOVER_VIRTUAL_HOST; + } + testVirtualhosts.addProperty("virtualhost.name", host); + testVirtualhosts.addProperty("virtualhost." + host + ".store.class", getTestProfileMessageStoreClassName()); + testVirtualhosts.addProperty( + "virtualhost." + host + ".store." + MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, "${QPID_WORK}/" + + host); + + startBroker(port, testConfiguration, testVirtualhosts); + revertSystemProperties(); + + _connectionURL.addBrokerDetails(new AMQBrokerDetails(String.format(BROKER_PORTION_FORMAT, port, + FAILOVER_CONNECTDELAY, FAILOVER_RETRIES))); + } + _connection = getConnection(_connectionURL); + ((AMQConnection) _connection).setConnectionListener(this); + _failoverComplete = new CountDownLatch(1); + _failoverStarted = new CountDownLatch(1); + } + + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + for (int i = 0; i < _brokerPorts.length; i++) + { + if (_brokerPorts[i] > 0) + { + stopBrokerSafely(_brokerPorts[i]); + FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort()); + } + } + + } + } + + public void startBroker() throws Exception + { + // noop, stop starting broker in super.tearDown() + } + + public void testFailoverOnBrokerKill() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + assertConnectionPort(_brokerPorts[1]); + + assertSendReceive(0); + + killBroker(_brokerPorts[1]); + + awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * 2); + assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount()); + + assertSendReceive(2); + assertConnectionPort(_brokerPorts[_brokerPorts.length - 1]); + } + + public void testFailoverOnBrokerStop() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + assertConnectionPort(_brokerPorts[1]); + + assertSendReceive(0); + + stopBroker(_brokerPorts[1]); + + awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * 2); + assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount()); + + assertSendReceive(1); + assertConnectionPort(_brokerPorts[_brokerPorts.length - 1]); + } + + private void assertConnectionPort(int brokerPort) + { + int connectionPort = ((AMQConnection)_connection).getActiveBrokerDetails().getPort(); + assertEquals("Unexpected broker port", brokerPort, connectionPort); + } + + private void assertSendReceive(int index) throws JMSException + { + Message message = createNextMessage(_producerSession, index); + _producer.send(message); + if (_producerSession.getTransacted()) + { + _producerSession.commit(); + } + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, index); + if (_consumerSession.getTransacted()) + { + _consumerSession.commit(); + } + } + + private void awaitForFailoverCompletion(long delay) + { + _logger.info("Awaiting Failover completion.."); + try + { + if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS)) + { + _logger.warn("Test thread stack:\n\n" + TestUtils.dumpThreads()); + fail("Failover did not complete"); + } + } + catch (InterruptedException e) + { + fail("Test was interrupted:" + e.getMessage()); + } + } + + private void assertReceivedMessage(Message receivedMessage, int messageIndex) + { + assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage); + assertTrue( + "Failure to receive message [" + messageIndex + "], expected TextMessage but received " + receivedMessage, + receivedMessage instanceof TextMessage); + } + + private void init(int acknowledgeMode, boolean startConnection) throws JMSException + { + boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false; + + _consumerSession = _connection.createSession(isTransacted, acknowledgeMode); + _destination = _consumerSession.createQueue(getTestQueueName()); + _consumer = _consumerSession.createConsumer(_destination); + + if (startConnection) + { + _connection.start(); + } + + _producerSession = _connection.createSession(isTransacted, acknowledgeMode); + _producer = _producerSession.createProducer(_destination); + + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + + @Override + public boolean preFailover(boolean redirect) + { + _failoverStarted.countDown(); + return true; + } + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public void failoverComplete() + { + _failoverComplete.countDown(); + } +} Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java Mon Sep 10 12:39:44 2012 @@ -82,28 +82,7 @@ public class InternalBrokerHolder implem @Override public String dumpThreads() { - ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); - ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); - StringBuilder dump = new StringBuilder(); - dump.append(String.format("%n")); - for (ThreadInfo threadInfo : threadInfos) - { - dump.append(threadInfo); - } - - long[] deadLocks = threadMXBean.findDeadlockedThreads(); - if (deadLocks != null && deadLocks.length > 0) - { - ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks); - dump.append(String.format("%n")); - dump.append("Deadlock is detected!"); - dump.append(String.format("%n")); - for (ThreadInfo threadInfo : deadlockedThreads) - { - dump.append(threadInfo); - } - } - return dump.toString(); + return TestUtils.dumpThreads(); } @Override Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java?rev=1382799&view=auto ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java (added) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java Mon Sep 10 12:39:44 2012 @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.utils; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + +public class TestUtils +{ + public static String dumpThreads() + { + ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true); + StringBuilder dump = new StringBuilder(); + dump.append(String.format("%n")); + for (ThreadInfo threadInfo : threadInfos) + { + dump.append(threadInfo); + } + + long[] deadLocks = threadMXBean.findDeadlockedThreads(); + if (deadLocks != null && deadLocks.length > 0) + { + ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks); + dump.append(String.format("%n")); + dump.append("Deadlock is detected!"); + dump.append(String.format("%n")); + for (ThreadInfo threadInfo : deadlockedThreads) + { + dump.append(threadInfo); + } + } + return dump.toString(); + } +} Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1382799&r1=1382798&r2=1382799&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original) +++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Mon Sep 10 12:39:44 2012 @@ -184,3 +184,6 @@ org.apache.qpid.disttest.* // Exclude java broker REST API tests org.apache.qpid.server.management.plugin.servlet.rest.* org.apache.qpid.systest.rest.acl.* + +// Exclude failover tests requiring virtual host functionality +org.apache.qpid.client.failover.MultipleBrokersFailoverTest#* \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org