Author: orudyy Date: Fri Jul 31 08:34:36 2015 New Revision: 1693542 URL: http://svn.apache.org/r1693542 Log: QPID-3521: Clear pre-dispatch queue in 0-8 client on failover process
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java qpid/java/trunk/test-profiles/cpp.excludes Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1693542&r1=1693541&r2=1693542&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Jul 31 08:34:36 2015 @@ -2324,6 +2324,15 @@ public abstract class AMQSession<C exten _failedOverDirty = true; } + // Also reset the delivery tag tracker, to insure we dont + // return the first <total number of msgs received on session> + // messages sent by the brokers following the first rollback + // after failover + _highestDeliveryTag.set(-1); + + _unacknowledgedMessageTags.clear(); + _prefetchedMessageTags.clear(); + _rollbackMark.set(-1); resubscribeProducers(); resubscribeConsumers(); @@ -2431,15 +2440,8 @@ public abstract class AMQSession<C exten void stop() throws QpidException { // Stop the server delivering messages to this session. - if (!(isClosed() || isClosing())) - { - suspendChannel(true); - } - - if (_dispatcher != null) - { - _dispatcher.setConnectionStopped(true); - } + suspendChannelIfNotClosing(); + stopExistingDispatcher(); } private void checkNotTransacted() throws JMSException @@ -3686,5 +3688,31 @@ public abstract class AMQSession<C exten { return _messageEncryptionHelper; } + + protected void drainDispatchQueueWithDispatcher() + { + if (!_queue.isEmpty()) + { + setUsingDispatcherForCleanup(true); + drainDispatchQueue(); + setUsingDispatcherForCleanup(false); + } + } + + protected void stopExistingDispatcher() + { + if (_dispatcher != null) + { + _dispatcher.setConnectionStopped(true); + } + } + + protected void suspendChannelIfNotClosing() throws QpidException + { + if (!(isClosed() || isClosing())) + { + suspendChannel(true); + } + } } Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1693542&r1=1693541&r2=1693542&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Jul 31 08:34:36 2015 @@ -1278,17 +1278,11 @@ public class AMQSession_0_10 extends AMQ @Override void resubscribe() throws QpidException { - // Also reset the delivery tag tracker, to insure we dont - // return the first <total number of msgs received on session> - // messages sent by the brokers following the first rollback - // after failover - getHighestDeliveryTag().set(-1); // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to //messages that came from the old broker. _txRangeSet.clear(); _txSize = 0; - getUnacknowledgedMessageTags().clear(); - getPrefetchedMessageTags().clear(); + super.resubscribe(); getQpidSession().sync(); } @@ -1296,10 +1290,10 @@ public class AMQSession_0_10 extends AMQ @Override void stop() throws QpidException { - super.stop(); - setUsingDispatcherForCleanup(true); - drainDispatchQueue(); - setUsingDispatcherForCleanup(false); + // Stop the server delivering messages to this session. + suspendChannelIfNotClosing(); + drainDispatchQueueWithDispatcher(); + stopExistingDispatcher(); for (BasicMessageConsumer consumer : getConsumers()) { Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1693542&r1=1693541&r2=1693542&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Jul 31 08:34:36 2015 @@ -179,6 +179,16 @@ public class AMQSession_0_8 extends AMQS getUnacknowledgedMessageTags().remove(deliveryTag); } + @Override + void resubscribe() throws QpidException + { + // drain dispatch queue + drainDispatchQueueWithDispatcher(); + + getDeliveredMessageTags().clear(); + super.resubscribe(); + } + public void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments, final String exchangeName, final AMQDestination destination, final boolean nowait) throws QpidException, FailoverException Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1693542&r1=1693541&r2=1693542&view=diff ============================================================================== --- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original) +++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Fri Jul 31 08:34:36 2015 @@ -18,7 +18,14 @@ */ package org.apache.qpid.client; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import javax.jms.JMSException; @@ -28,10 +35,12 @@ import javax.jms.MessageProducer; import javax.jms.StreamMessage; import org.apache.qpid.client.message.AMQPEncodedListMessage; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.*; import org.apache.qpid.transport.Connection.SessionFactory; import org.apache.qpid.transport.Connection.State; +import org.apache.qpid.url.AMQBindingURL; /** * Tests AMQSession_0_10 methods. @@ -456,6 +465,91 @@ public class AMQSession_0_10Test extends assertNotNull("QueueQuery command was not sent", command); } + public void testResubscribe() throws Exception + { + AMQSession_0_10 session = createAMQSession_0_10(AMQSession_0_10.AUTO_ACKNOWLEDGE); + + AMQQueue queue1 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test1?routingkey='test1'&durable='true'")); + session.createProducer(queue1); + BasicMessageConsumer_0_10 consumer1 = (BasicMessageConsumer_0_10)session.createConsumer(queue1); + + AMQQueue queue2 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test2?routingkey='test2'")); + session.createProducer(queue2); + BasicMessageConsumer_0_10 consumer2 = (BasicMessageConsumer_0_10)session.createConsumer(queue2); + + UnprocessedMessage[] messages = new UnprocessedMessage[4]; + for (int i =0; i< messages.length;i++ ) + { + int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag(); + int deliveryTag = i + 1; + messages[i]= createMockMessage(deliveryTag, consumerTag); + session.messageReceived(messages[i]); + if (deliveryTag % 2 == 0) + { + session.addUnacknowledgedMessage(deliveryTag); + } + } + + assertEquals("Unexpected highest delivery tag", 4, session.getHighestDeliveryTag().get()); + assertFalse("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty()); + assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers())); + + // verify test messages were not dispatched + for (UnprocessedMessage message: messages ) + { + verify(message, never()).dispatch(session); + } + + session.resubscribe(); + + assertEquals("Unexpected highest delivery tag", -1, session.getHighestDeliveryTag().get()); + assertTrue("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty()); + assertTrue("Unexpected pre-fetched message tags", session.getPrefetchedMessageTags().isEmpty()); + assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers())); + } + + public void testFailoverPrep() throws Exception + { + AMQSession_0_10 session = createAMQSession_0_10(AMQSession_0_10.AUTO_ACKNOWLEDGE); + + UnprocessedMessage[] messages = new UnprocessedMessage[4]; + for (int i =0; i< messages.length;i++ ) + { + int consumerTag = i % 2; + int deliveryTag = i + 1; + messages[i]= createMockMessage(deliveryTag, consumerTag); + session.messageReceived(messages[i]); + if (deliveryTag % 2 == 0) + { + session.addUnacknowledgedMessage(deliveryTag); + } + } + + // verify test messages were not dispatched + for (UnprocessedMessage message: messages ) + { + verify(message, never()).dispatch(session); + } + + session.failoverPrep(); + + // verify dispatcher queue is drained + for (UnprocessedMessage message: messages ) + { + verify(message).dispatch(session); + } + } + + private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag) + { + UnprocessedMessage message = mock(UnprocessedMessage.class); + when(message.getConsumerTag()).thenReturn(consumerTag); + when(message.getDeliveryTag()).thenReturn(deliveryTag); + return message; + } + + + private AMQAnyDestination createDestination() { AMQAnyDestination destination = null; Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java?rev=1693542&r1=1693541&r2=1693542&view=diff ============================================================================== --- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java (original) +++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java Fri Jul 31 08:34:36 2015 @@ -20,9 +20,25 @@ */ package org.apache.qpid.client; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; + import org.apache.qpid.QpidException; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.transport.TestNetworkConnection; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.network.NetworkConnection; @@ -43,21 +59,8 @@ public class AMQSession_0_8Test extends { final String testQueueName = "tmp_127_0_0_1_1_1"; - _connection.setConnectionListener(new ConnectionListenerSupport() - { - @Override - public void bytesSent(long count) - { - try - { - _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBody(AMQShortString.valueOf(testQueueName), 0, 0)); - } - catch (QpidException e) - { - throw new RuntimeException(e); - } - } - }); + _connection.setConnectionListener(new MockReceiveConnectionListener(_connection, 1, + new QueueDeclareOkBody(AMQShortString.valueOf(testQueueName), 0, 0))); AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1); @@ -70,4 +73,137 @@ public class AMQSession_0_8Test extends assertEquals("Unexpected queue name", testQueueName, queue.getAMQQueueName()); } + + public void testResubscribe() throws Exception + { + // to verify producer resubscribe set qpid.declare_exchanges=true + setTestSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true"); + setTestSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false"); + setTestSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "false"); + + AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1); + + AMQQueue queue1 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test1?routingkey='test1'")); + + // expecting exchange declare Ok + MockReceiveConnectionListener listener = new MockReceiveConnectionListener(_connection, 1, new ExchangeDeclareOkBody()); + _connection.setConnectionListener(listener); + session.createProducer(queue1); + assertTrue("Not all expected commands have been sent on producer1 creation", listener.responsesEmpty()); + + // expecting exchange declare Ok, Channel flow Ok, Consume Ok + listener = new MockReceiveConnectionListener(_connection, 1, + new ExchangeDeclareOkBody(), new ChannelFlowOkBody(false), new BasicConsumeOkBody(AMQShortString.valueOf("1"))); + _connection.setConnectionListener(listener ); + BasicMessageConsumer_0_8 consumer1 = (BasicMessageConsumer_0_8)session.createConsumer(queue1); + assertTrue("Not all expected commands have been sent on consumer1 creation", listener.responsesEmpty()); + + // expecting exchange declare Ok + listener = new MockReceiveConnectionListener(_connection, 1, new ExchangeDeclareOkBody()); + _connection.setConnectionListener(listener); + AMQQueue queue2 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test2?routingkey='test2'")); + session.createProducer(queue2); + assertTrue("Not all expected commands have been sent on producer2 creation", listener.responsesEmpty()); + + + // expecting exchange declare Ok, Consume Ok + listener = new MockReceiveConnectionListener(_connection, 1, + new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("2"))); + _connection.setConnectionListener(listener); + + BasicMessageConsumer_0_8 consumer2 = (BasicMessageConsumer_0_8)session.createConsumer(queue2); + assertTrue("Not all expected commands have been sent on consumer2 creation", listener.responsesEmpty()); + + UnprocessedMessage[] messages = new UnprocessedMessage[4]; + for (int i =0; i< messages.length;i++ ) + { + int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag(); + int deliveryTag = i + 1; + messages[i]= createMockMessage(deliveryTag, consumerTag); + session.messageReceived(messages[i]); + if (deliveryTag % 2 == 0) + { + session.addDeliveredMessage(deliveryTag); + } + else + { + session.addUnacknowledgedMessage(deliveryTag); + } + } + + assertEquals("Unexpected highest delivery tag", messages.length, session.getHighestDeliveryTag().get()); + assertFalse("Unexpected delivered message tags", session.getDeliveredMessageTags().isEmpty()); + assertFalse("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty()); + assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers())); + + // verify messages were not dispatched + for (UnprocessedMessage message: messages ) + { + verify(message, never()).dispatch(session); + } + + listener = new MockReceiveConnectionListener(_connection, 1, + new ExchangeDeclareOkBody(), // first producer resubscribe + new ExchangeDeclareOkBody(), // second producer resubscribe + new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("1")), // first consumer resubscribe + new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("2"))); // second consumer resubscribe + _connection.setConnectionListener(listener); + + session.resubscribe(); + + assertTrue("Not all expected commands have been sent on session resubscribe", listener.responsesEmpty()); + + assertEquals("Unexpected highest delivery tag", -1, session.getHighestDeliveryTag().get()); + assertTrue("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty()); + assertTrue("Unexpected delivered message tags", session.getDeliveredMessageTags().isEmpty()); + assertTrue("Unexpected pre-fetched message tags", session.getPrefetchedMessageTags().isEmpty()); + assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers())); + + // verify dispatcher queue is drained + for (UnprocessedMessage message: messages ) + { + verify(message).dispatch(session); + } + } + + private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag) + { + UnprocessedMessage message = mock(UnprocessedMessage.class); + when(message.getConsumerTag()).thenReturn(consumerTag); + when(message.getDeliveryTag()).thenReturn(deliveryTag); + return message; + } + + static class MockReceiveConnectionListener extends ConnectionListenerSupport + { + private final AMQConnection _connection; + private final List<AMQBody> _responses; + private final int _channelId; + + MockReceiveConnectionListener(AMQConnection connection, int channelId, AMQBody... response) + { + _connection = connection; + _responses = new ArrayList<>(Arrays.asList(response)); + _channelId = channelId; + } + + @Override + public void bytesSent(long count) + { + try + { + AMQBody response = _responses.remove(0); + _connection.getProtocolHandler().methodBodyReceived(_channelId, response); + } + catch (QpidException e) + { + throw new RuntimeException(e); + } + } + + public boolean responsesEmpty() + { + return _responses.isEmpty(); + } + } } Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1693542&r1=1693541&r2=1693542&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Fri Jul 31 08:34:36 2015 @@ -27,7 +27,15 @@ import org.apache.qpid.client.AMQSession import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.server.management.plugin.HttpManagement; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Plugin; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.systest.rest.RestTestHelper; import org.apache.qpid.test.utils.FailoverBaseCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.test.utils.TestUtils; import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +56,7 @@ import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; import javax.naming.NamingException; +import java.io.IOException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; @@ -1024,6 +1033,184 @@ public class FailoverBehaviourTest exten } } + public void testFailoverWhenConnectionStopped() throws Exception + { + // not needed + _connection.close(); + + // not needed + stopBroker(getFailingPort()); + + // stop broker and add http management + stopBroker(); + configureHttpManagement(); + startBroker(); + + _connection = createConnectionWithFailover(); + init(Session.SESSION_TRANSACTED, true); + + // populate broker with initial messages + final int testMessageNumber = 10; + produceMessages(TEST_MESSAGE_FORMAT, testMessageNumber, false); + _producerSession.commit(); + + final CountDownLatch stopFlag = new CountDownLatch(1); + final CountDownLatch consumerBlocker = new CountDownLatch(1); + final AtomicReference<Exception> exception = new AtomicReference<>(); + final CountDownLatch messageCounter = new CountDownLatch(testMessageNumber); + _consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message message) + { + if (consumerBlocker.getCount() == 1) + { + try + { + consumerBlocker.await(); + + _LOGGER.debug("Stopping connection from dispatcher thread"); + _connection.stop(); + _LOGGER.debug("Connection stopped from dispatcher thread"); + stopFlag.countDown(); + } + catch (Exception e) + { + exception.set(e); + } + } + + try + { + _consumerSession.commit(); + messageCounter.countDown(); + } + catch (Exception e) + { + exception.set(e); + } + } + }); + + int unacknowledgedMessageNumber = getUnacknowledgedMessageNumber(testMessageNumber); + + assertEquals("Unexpected number of unacknowledged messages", testMessageNumber, unacknowledgedMessageNumber); + + // stop blocking dispatcher thread + consumerBlocker.countDown(); + + boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS); + _LOGGER.debug("Thread dump:" + TestUtils.dumpThreads()); + assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()), + stopResult); + assertNull("Unexpected exception on stop :" + exception.get(), exception.get()); + closeConnectionViaManagement(); + + // wait for failover to complete + awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); + assertFailoverException(); + + // publish more messages when connection stopped + produceMessages(TEST_MESSAGE_FORMAT, 2, false); + _producerSession.commit(); + + _connection.start(); + + assertTrue("Not all messages were delivered. Remaining message number " + messageCounter.getCount(), messageCounter.await(11000, TimeUnit.MILLISECONDS)); + _connection.close(); + } + + private int getUnacknowledgedMessageNumber(int testMessageNumber) throws IOException, InterruptedException + { + int unacknowledgedMessageNumber = 0; + int i =0; + do + { + unacknowledgedMessageNumber = getUnacknowledgedMessageNumber(); + if (unacknowledgedMessageNumber != testMessageNumber) + { + Thread.sleep(50); + } + else + { + break; + } + } + while (i++ < 20); + return unacknowledgedMessageNumber; + } + + private void configureHttpManagement() + { + TestBrokerConfiguration config = getBrokerConfiguration(); + config.addHttpManagementConfiguration(); + String initialConfiguration = System.getProperty("virtualhostnode.context.blueprint"); + if (initialConfiguration != null) + { + config.setObjectAttribute(VirtualHostNode.class, "test", VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, initialConfiguration); + } + config.setObjectAttribute(AuthenticationProvider.class, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER, + "secureOnlyMechanisms", + "{}"); + + + // set password authentication provider on http port for the tests + config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER, + TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER); + config.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true); + config.setSaved(false); + } + + private void closeConnectionViaManagement() throws IOException + { + RestTestHelper restTestHelper = new RestTestHelper(getHttpManagementPort(getPort(0))); + try + { + restTestHelper.setUsernameAndPassword("webadmin", "webadmin"); + List<Map<String, Object>> connections = restTestHelper.getJsonAsList("virtualhost/test/test/getConnections"); + assertEquals("Unexpected number of connections", 1, connections.size()); + Map<String, Object> connection = connections.get(0); + String connectionName = (String) connection.get(org.apache.qpid.server.model.Connection.NAME); + restTestHelper.submitRequest("connection/" + TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT + "/" + restTestHelper.encodeAsUTF( connectionName ), "DELETE", 200); + } + finally + { + restTestHelper.tearDown(); + } + } + + private int getUnacknowledgedMessageNumber() throws IOException + { + RestTestHelper restTestHelper = new RestTestHelper(getHttpManagementPort(getPort(0))); + try + { + restTestHelper.setUsernameAndPassword("webadmin", "webadmin"); + List<Map<String, Object>> sessions = restTestHelper.getJsonAsList("session"); + for(Map<String, Object> session: sessions ) + { + List<Map<String, Object>> consumers = (List<Map<String, Object>>)session.get("consumers"); + if (consumers != null) + { + Map<String, Object> consumer = consumers.get(0); + Map<String, Object> stat = (Map<String, Object>)consumer.get("statistics"); + if (stat != null) + { + Number unacknowledgedMessages = (Number)stat.get("unacknowledgedMessages"); + if (unacknowledgedMessages != null) + { + return unacknowledgedMessages.intValue(); + } + } + } + } + return 0; + } + finally + { + restTestHelper.tearDown(); + } + } + private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception { final Map<String, Object> arguments = new HashMap<String, Object>(); Modified: qpid/java/trunk/test-profiles/cpp.excludes URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/cpp.excludes?rev=1693542&r1=1693541&r2=1693542&view=diff ============================================================================== --- qpid/java/trunk/test-profiles/cpp.excludes (original) +++ qpid/java/trunk/test-profiles/cpp.excludes Fri Jul 31 08:34:36 2015 @@ -29,3 +29,6 @@ org.apache.qpid.test.client.message.JMSD //BDB System Tests org.apache.qpid.server.store.berkeleydb.* + +// test relies on Java Broker REST interfaces +org.apache.qpid.client.failover.FailoverBehaviourTest.testFailoverWhenConnectionStopped --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org