Repository: qpid-jms Updated Branches: refs/heads/master 0317483d4 -> 6485baab5
Remove use of the default message factory from the FailoverProvider and make it dependent on the connected instance for the message factory instance. Also adds a new event onConnectionEstablished which allows the providers to know when chained that the next one in the series has connected and allows for a client level event to indicate same. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6485baab Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6485baab Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6485baab Branch: refs/heads/master Commit: 6485baab519cce0cf5ebcdd6b6378d371548bb34 Parents: 0317483 Author: Timothy Bish <tabish...@gmail.com> Authored: Mon Sep 29 16:34:26 2014 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Mon Sep 29 16:34:26 2014 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 19 +++- .../apache/qpid/jms/JmsConnectionListener.java | 11 ++ .../java/org/apache/qpid/jms/JmsSession.java | 22 ++-- .../qpid/jms/provider/AbstractProvider.java | 7 ++ .../jms/provider/DefaultProviderListener.java | 4 + .../qpid/jms/provider/ProviderListener.java | 10 ++ .../qpid/jms/provider/ProviderWrapper.java | 5 + .../qpid/jms/provider/amqp/AmqpProvider.java | 21 +++- .../jms/provider/failover/FailoverProvider.java | 111 ++++++++++++------- .../jms/discovery/JmsAmqpDiscoveryTest.java | 8 ++ 10 files changed, 163 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index b4d03af..cc2f979 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -64,9 +64,9 @@ import org.apache.qpid.jms.meta.JmsSessionId; import org.apache.qpid.jms.meta.JmsTransactionId; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderClosedException; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.provider.ProviderListener; -import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.util.IdGenerator; import org.apache.qpid.jms.util.ThreadPoolUtils; import org.slf4j.Logger; @@ -525,7 +525,6 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti this.connectionInfo = createResource(connectionInfo); this.connected.set(true); - this.messageFactory = provider.getMessageFactory(); // TODO - Advisory Support. // @@ -977,6 +976,9 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } public JmsMessageFactory getMessageFactory() { + if (messageFactory == null) { + throw new RuntimeException("Message factory should never be null"); + } return messageFactory; } @@ -1060,6 +1062,19 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection {} connected to remote Broker: {}", connectionInfo.getConnectionId(), remoteURI); + this.messageFactory = provider.getMessageFactory(); + + // TODO - For events triggered from the Provider thread, we might want to consider always + // firing the client level events on the Connection executor to prevent the client + // from stalling the provider thread. + for (JmsConnectionListener listener : connectionListeners) { + listener.onConnectionEstablished(remoteURI); + } + } + + @Override public void onConnectionFailure(final IOException ex) { onAsyncException(ex); if (!closing.get() && !closed.get()) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java index 2439760..d1a2663 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java @@ -27,6 +27,17 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch; public interface JmsConnectionListener { /** + * Called when a connection has been successfully established. + * + * This method is never called more than once when using a fault tolerant + * connection, instead the connection will signal interrupted and restored. + * + * @param remoteURI + * The URI of the Broker this client is now connected to. + */ + void onConnectionEstablished(URI remoteURI); + + /** * Called when an unrecoverable error occurs and the Connection must be closed. * * @param error http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 3dbbaf2..7a87e04 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -61,7 +61,6 @@ import javax.jms.TopicSubscriber; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessage; -import org.apache.qpid.jms.message.JmsMessageFactory; import org.apache.qpid.jms.message.JmsMessageTransformation; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsConsumerId; @@ -97,7 +96,6 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa private final AtomicLong consumerIdGenerator = new AtomicLong(); private final AtomicLong producerIdGenerator = new AtomicLong(); private JmsLocalTransactionContext transactionContext; - private JmsMessageFactory messageFactory; protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { this.connection = connection; @@ -111,7 +109,6 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync()); this.sessionInfo = connection.createResource(sessionInfo); - this.messageFactory = connection.getMessageFactory(); } int acknowledgementMode() { @@ -493,49 +490,49 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa @Override public BytesMessage createBytesMessage() throws JMSException { checkClosed(); - return init(messageFactory.createBytesMessage()); + return init(connection.getMessageFactory().createBytesMessage()); } @Override public MapMessage createMapMessage() throws JMSException { checkClosed(); - return init(messageFactory.createMapMessage()); + return init(connection.getMessageFactory().createMapMessage()); } @Override public Message createMessage() throws JMSException { checkClosed(); - return init(messageFactory.createMessage()); + return init(connection.getMessageFactory().createMessage()); } @Override public ObjectMessage createObjectMessage() throws JMSException { checkClosed(); - return init(messageFactory.createObjectMessage(null)); + return init(connection.getMessageFactory().createObjectMessage(null)); } @Override public ObjectMessage createObjectMessage(Serializable object) throws JMSException { checkClosed(); - return init(messageFactory.createObjectMessage(object)); + return init(connection.getMessageFactory().createObjectMessage(object)); } @Override public StreamMessage createStreamMessage() throws JMSException { checkClosed(); - return init(messageFactory.createStreamMessage()); + return init(connection.getMessageFactory().createStreamMessage()); } @Override public TextMessage createTextMessage() throws JMSException { checkClosed(); - return init(messageFactory.createTextMessage(null)); + return init(connection.getMessageFactory().createTextMessage(null)); } @Override public TextMessage createTextMessage(String text) throws JMSException { checkClosed(); - return init(messageFactory.createTextMessage(text)); + return init(connection.getMessageFactory().createTextMessage(text)); } ////////////////////////////////////////////////////////////////////////// @@ -937,9 +934,6 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa } protected void onConnectionRecovered(Provider provider) throws Exception { - - this.messageFactory = provider.getMessageFactory(); - for (JmsMessageProducer producer : producers) { producer.onConnectionRecovered(provider); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java index 9d743d2..9856aa8 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java @@ -78,6 +78,13 @@ public abstract class AbstractProvider implements Provider { return remoteURI; } + public void fireConnectionEstablished() { + ProviderListener listener = this.listener; + if (listener != null) { + listener.onConnectionEstablished(remoteURI); + } + } + public void fireProviderException(Throwable ex) { ProviderListener listener = this.listener; if (listener != null) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java index e0d3e01..4477121 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java @@ -35,6 +35,10 @@ public class DefaultProviderListener implements ProviderListener { } @Override + public void onConnectionEstablished(URI remoteURI) { + } + + @Override public void onConnectionFailure(IOException ex) { } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java index 18938d5..bcab614 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java @@ -91,6 +91,16 @@ public interface ProviderListener { void onConnectionRestored(URI remoteURI); /** + * Called to indicate that the underlying connection to the Broker has been established + * for the first time. For a fault tolerant provider this event should only ever be + * triggered once with the interruption and recovery events following on for future + * + * @param ex + * The exception that indicates the cause of this Provider failure. + */ + void onConnectionEstablished(URI remoteURI); + + /** * Called to indicate that the underlying connection to the Broker has been lost and * the Provider will not perform any reconnect. Following this call the provider is * in a failed state and further calls to it will throw an Exception. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java index 1381dba..e03f5fa 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java @@ -166,6 +166,11 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi } @Override + public void onConnectionEstablished(URI remoteURI) { + this.listener.onConnectionEstablished(this.next.getRemoteURI()); + } + + @Override public void onConnectionFailure(IOException ex) { this.listener.onConnectionInterrupted(this.next.getRemoteURI()); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index aad47f1..259948d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -42,8 +42,8 @@ import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.meta.JmsTransactionInfo; import org.apache.qpid.jms.provider.AbstractProvider; import org.apache.qpid.jms.provider.AsyncResult; -import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; +import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.transports.TcpTransport; import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.util.IOExceptionSupport; @@ -243,7 +243,24 @@ public class AmqpProvider extends AbstractProvider implements TransportListener sasl.client(); } connection = new AmqpConnection(AmqpProvider.this, protonConnection, sasl, connectionInfo); - connection.open(request); + connection.open(new AsyncResult() { + + @Override + public void onSuccess() { + fireConnectionEstablished(); + request.onSuccess(); + } + + @Override + public void onFailure(Throwable result) { + request.onFailure(result); + } + + @Override + public boolean isComplete() { + return request.isComplete(); + } + }); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index 2f4db2c..ca8a5e6 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.jms.JMSException; import org.apache.qpid.jms.JmsSslContext; -import org.apache.qpid.jms.message.JmsDefaultMessageFactory; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessageFactory; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; @@ -45,10 +44,10 @@ import org.apache.qpid.jms.meta.JmsSessionId; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.DefaultProviderListener; import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFactory; import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.provider.ProviderListener; -import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.util.IOExceptionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +76,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide private final Map<Long, FailoverRequest> requests = new LinkedHashMap<Long, FailoverRequest>(); private final DefaultProviderListener closedListener = new DefaultProviderListener(); private final JmsSslContext sslContext; - private final JmsMessageFactory defaultMessageFactory = new JmsDefaultMessageFactory(); + private final AtomicReference<JmsMessageFactory> messageFactory = new AtomicReference<JmsMessageFactory>(); // Current state of connection / reconnection private boolean firstConnection = true; @@ -205,20 +204,30 @@ public class FailoverProvider extends DefaultProviderListener implements Provide @Override public void create(final JmsResource resource, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { checkClosed(); - final FailoverRequest pending = new FailoverRequest(request) { - @Override - public void doTask() throws Exception { - if (resource instanceof JmsConnectionInfo) { - JmsConnectionInfo connectionInfo = (JmsConnectionInfo) resource; - connectTimeout = connectionInfo.getConnectTimeout(); - closeTimeout = connectionInfo.getCloseTimeout(); - sendTimeout = connectionInfo.getSendTimeout(); - requestTimeout = connectionInfo.getRequestTimeout(); - } + FailoverRequest pending = null; + if (resource instanceof JmsConnectionInfo) { + pending = new CreateConnectionRequest(request) { + @Override + public void doTask() throws Exception { + if (resource instanceof JmsConnectionInfo) { + JmsConnectionInfo connectionInfo = (JmsConnectionInfo) resource; + connectTimeout = connectionInfo.getConnectTimeout(); + closeTimeout = connectionInfo.getCloseTimeout(); + sendTimeout = connectionInfo.getSendTimeout(); + requestTimeout = connectionInfo.getRequestTimeout(); + } - provider.create(resource, this); - } - }; + provider.create(resource, this); + } + }; + } else { + pending = new FailoverRequest(request) { + @Override + public void doTask() throws Exception { + provider.create(resource, this); + } + }; + } serializer.execute(pending); } @@ -389,20 +398,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide @Override public JmsMessageFactory getMessageFactory() { - final AtomicReference<JmsMessageFactory> result = - new AtomicReference<JmsMessageFactory>(defaultMessageFactory); - - serializer.execute(new Runnable() { - - @Override - public void run() { - if (provider != null) { - result.set(provider.getMessageFactory()); - } - } - }); - - return result.get(); + return messageFactory.get(); } //--------------- Connection Error and Recovery methods ------------------// @@ -415,6 +411,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide * is allowed and if so a new reconnect cycle is triggered on the connection thread. * * @param cause + * the error that triggered the failure of the provider. */ private void handleProviderFailure(final IOException cause) { LOG.debug("handling Provider failure: {}", cause.getMessage()); @@ -445,7 +442,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide /** * Called from the reconnection thread. This method enqueues a new task that - * will attempt to recover connection state, once successful normal operations + * will attempt to recover connection state, once successful, normal operations * will resume. If an error occurs while attempting to recover the JMS framework * state then a reconnect cycle is again triggered on the connection thread. * @@ -458,10 +455,10 @@ public class FailoverProvider extends DefaultProviderListener implements Provide public void run() { try { if (firstConnection) { - firstConnection = false; FailoverProvider.this.provider = provider; provider.setProviderListener(FailoverProvider.this); + // The only pending request here should be a Connection create request. List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values()); for (FailoverRequest request : pending) { request.run(); @@ -474,13 +471,16 @@ public class FailoverProvider extends DefaultProviderListener implements Provide // Stage 1: Recovery all JMS Framework resources listener.onConnectionRecovery(provider); - // Stage 2: Restart consumers, send pull commands, etc. + // Stage 2: Connection state recovered, get newly configured message factory. + FailoverProvider.this.messageFactory.set(provider.getMessageFactory()); + + // Stage 3: Restart consumers, send pull commands, etc. listener.onConnectionRecovered(provider); - // Stage 3: Let the client know that connection has restored. + // Stage 4: Let the client know that connection has restored. listener.onConnectionRestored(provider.getRemoteURI()); - // Stage 4: Send pending actions. + // Stage 5: Send pending actions. List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values()); for (FailoverRequest request : pending) { request.run(); @@ -771,8 +771,6 @@ public class FailoverProvider extends DefaultProviderListener implements Provide * For all requests that are dispatched from the FailoverProvider to a connected * Provider instance an instance of FailoverRequest is used to handle errors that * occur during processing of that request and trigger a reconnect. - * - * @param <T> */ protected abstract class FailoverRequest extends ProviderFuture implements Runnable { @@ -854,4 +852,43 @@ public class FailoverProvider extends DefaultProviderListener implements Provide return false; } } + + /** + * Captures the initial request to create a JmsConnectionInfo based resources and ensures + * that if the connection is successfully established that the connection established event + * is triggered once before moving on to sending only connection interrupted and restored + * events. + */ + protected abstract class CreateConnectionRequest extends FailoverRequest { + + /** + * @param watcher + */ + public CreateConnectionRequest(AsyncResult watcher) { + super(watcher); + } + + @Override + public void onSuccess() { + serializer.execute(new Runnable() { + @Override + public void run() { + if (firstConnection) { + LOG.trace("First connection requst has completed:"); + FailoverProvider.this.messageFactory.set(provider.getMessageFactory()); + listener.onConnectionEstablished(provider.getRemoteURI()); + firstConnection = false; + } else { + LOG.warn("A second call to a CreateConnectionRequest not expected."); + } + + CreateConnectionRequest.this.signalConnected(); + } + }); + } + + public void signalConnected() { + super.onSuccess(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java index 77cf6ce..4945a92 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java @@ -42,6 +42,7 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnecti private static final Logger LOG = LoggerFactory.getLogger(JmsAmqpDiscoveryTest.class); + private CountDownLatch connected; private CountDownLatch interrupted; private CountDownLatch restored; private JmsConnection jmsConnection; @@ -51,6 +52,7 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnecti public void setUp() throws Exception { super.setUp(); + connected = new CountDownLatch(1); interrupted = new CountDownLatch(1); restored = new CountDownLatch(1); } @@ -148,6 +150,12 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnecti } @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection reports established. Connected to -> {}", remoteURI); + connected.countDown(); + } + + @Override public void onConnectionInterrupted(URI remoteURI) { LOG.info("Connection reports interrupted. Lost connection to -> {}", remoteURI); interrupted.countDown(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org