Repository: qpid-jms
Updated Branches:
  refs/heads/master 0317483d4 -> 6485baab5


Remove use of the default message factory from the FailoverProvider and
make it dependent on the connected instance for the message factory
instance.  Also adds a new event onConnectionEstablished which allows
the providers to know when chained that the next one in the series has
connected and allows for a client level event to indicate same.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6485baab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6485baab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6485baab

Branch: refs/heads/master
Commit: 6485baab519cce0cf5ebcdd6b6378d371548bb34
Parents: 0317483
Author: Timothy Bish <tabish...@gmail.com>
Authored: Mon Sep 29 16:34:26 2014 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Mon Sep 29 16:34:26 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  19 +++-
 .../apache/qpid/jms/JmsConnectionListener.java  |  11 ++
 .../java/org/apache/qpid/jms/JmsSession.java    |  22 ++--
 .../qpid/jms/provider/AbstractProvider.java     |   7 ++
 .../jms/provider/DefaultProviderListener.java   |   4 +
 .../qpid/jms/provider/ProviderListener.java     |  10 ++
 .../qpid/jms/provider/ProviderWrapper.java      |   5 +
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  21 +++-
 .../jms/provider/failover/FailoverProvider.java | 111 ++++++++++++-------
 .../jms/discovery/JmsAmqpDiscoveryTest.java     |   8 ++
 10 files changed, 163 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index b4d03af..cc2f979 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -64,9 +64,9 @@ import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderClosedException;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
-import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.util.IdGenerator;
 import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.slf4j.Logger;
@@ -525,7 +525,6 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
 
             this.connectionInfo = createResource(connectionInfo);
             this.connected.set(true);
-            this.messageFactory = provider.getMessageFactory();
 
             // TODO - Advisory Support.
             //
@@ -977,6 +976,9 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     public JmsMessageFactory getMessageFactory() {
+        if (messageFactory == null) {
+            throw new RuntimeException("Message factory should never be null");
+        }
         return messageFactory;
     }
 
@@ -1060,6 +1062,19 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
     }
 
     @Override
+    public void onConnectionEstablished(URI remoteURI) {
+        LOG.info("Connection {} connected to remote Broker: {}", 
connectionInfo.getConnectionId(), remoteURI);
+        this.messageFactory = provider.getMessageFactory();
+
+        // TODO - For events triggered from the Provider thread, we might want 
to consider always
+        //        firing the client level events on the Connection executor to 
prevent the client
+        //        from stalling the provider thread.
+        for (JmsConnectionListener listener : connectionListeners) {
+            listener.onConnectionEstablished(remoteURI);
+        }
+    }
+
+    @Override
     public void onConnectionFailure(final IOException ex) {
         onAsyncException(ex);
         if (!closing.get() && !closed.get()) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
index 2439760..d1a2663 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
@@ -27,6 +27,17 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 public interface JmsConnectionListener {
 
     /**
+     * Called when a connection has been successfully established.
+     *
+     * This method is never called more than once when using a fault tolerant
+     * connection, instead the connection will signal interrupted and restored.
+     *
+     * @param remoteURI
+     *        The URI of the Broker this client is now connected to.
+     */
+    void onConnectionEstablished(URI remoteURI);
+
+    /**
      * Called when an unrecoverable error occurs and the Connection must be 
closed.
      *
      * @param error

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 3dbbaf2..7a87e04 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -61,7 +61,6 @@ import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
-import org.apache.qpid.jms.message.JmsMessageFactory;
 import org.apache.qpid.jms.message.JmsMessageTransformation;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsConsumerId;
@@ -97,7 +96,6 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
     private final AtomicLong consumerIdGenerator = new AtomicLong();
     private final AtomicLong producerIdGenerator = new AtomicLong();
     private JmsLocalTransactionContext transactionContext;
-    private JmsMessageFactory messageFactory;
 
     protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int 
acknowledgementMode) throws JMSException {
         this.connection = connection;
@@ -111,7 +109,6 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
         this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync());
 
         this.sessionInfo = connection.createResource(sessionInfo);
-        this.messageFactory = connection.getMessageFactory();
     }
 
     int acknowledgementMode() {
@@ -493,49 +490,49 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
     @Override
     public BytesMessage createBytesMessage() throws JMSException {
         checkClosed();
-        return init(messageFactory.createBytesMessage());
+        return init(connection.getMessageFactory().createBytesMessage());
     }
 
     @Override
     public MapMessage createMapMessage() throws JMSException {
         checkClosed();
-        return init(messageFactory.createMapMessage());
+        return init(connection.getMessageFactory().createMapMessage());
     }
 
     @Override
     public Message createMessage() throws JMSException {
         checkClosed();
-        return init(messageFactory.createMessage());
+        return init(connection.getMessageFactory().createMessage());
     }
 
     @Override
     public ObjectMessage createObjectMessage() throws JMSException {
         checkClosed();
-        return init(messageFactory.createObjectMessage(null));
+        return init(connection.getMessageFactory().createObjectMessage(null));
     }
 
     @Override
     public ObjectMessage createObjectMessage(Serializable object) throws 
JMSException {
         checkClosed();
-        return init(messageFactory.createObjectMessage(object));
+        return 
init(connection.getMessageFactory().createObjectMessage(object));
     }
 
     @Override
     public StreamMessage createStreamMessage() throws JMSException {
         checkClosed();
-        return init(messageFactory.createStreamMessage());
+        return init(connection.getMessageFactory().createStreamMessage());
     }
 
     @Override
     public TextMessage createTextMessage() throws JMSException {
         checkClosed();
-        return init(messageFactory.createTextMessage(null));
+        return init(connection.getMessageFactory().createTextMessage(null));
     }
 
     @Override
     public TextMessage createTextMessage(String text) throws JMSException {
         checkClosed();
-        return init(messageFactory.createTextMessage(text));
+        return init(connection.getMessageFactory().createTextMessage(text));
     }
 
     //////////////////////////////////////////////////////////////////////////
@@ -937,9 +934,6 @@ public class JmsSession implements Session, QueueSession, 
TopicSession, JmsMessa
     }
 
     protected void onConnectionRecovered(Provider provider) throws Exception {
-
-        this.messageFactory = provider.getMessageFactory();
-
         for (JmsMessageProducer producer : producers) {
             producer.onConnectionRecovered(provider);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
index 9d743d2..9856aa8 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
@@ -78,6 +78,13 @@ public abstract class AbstractProvider implements Provider {
         return remoteURI;
     }
 
+    public void fireConnectionEstablished() {
+        ProviderListener listener = this.listener;
+        if (listener != null) {
+            listener.onConnectionEstablished(remoteURI);
+        }
+    }
+
     public void fireProviderException(Throwable ex) {
         ProviderListener listener = this.listener;
         if (listener != null) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
index e0d3e01..4477121 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -35,6 +35,10 @@ public class DefaultProviderListener implements 
ProviderListener {
     }
 
     @Override
+    public void onConnectionEstablished(URI remoteURI) {
+    }
+
+    @Override
     public void onConnectionFailure(IOException ex) {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
index 18938d5..bcab614 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -91,6 +91,16 @@ public interface ProviderListener {
     void onConnectionRestored(URI remoteURI);
 
     /**
+     * Called to indicate that the underlying connection to the Broker has 
been established
+     * for the first time.  For a fault tolerant provider this event should 
only ever be
+     * triggered once with the interruption and recovery events following on 
for future
+     *
+     * @param ex
+     *        The exception that indicates the cause of this Provider failure.
+     */
+    void onConnectionEstablished(URI remoteURI);
+
+    /**
      * Called to indicate that the underlying connection to the Broker has 
been lost and
      * the Provider will not perform any reconnect.  Following this call the 
provider is
      * in a failed state and further calls to it will throw an Exception.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index 1381dba..e03f5fa 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -166,6 +166,11 @@ public class ProviderWrapper<E extends Provider> 
implements Provider, ProviderLi
     }
 
     @Override
+    public void onConnectionEstablished(URI remoteURI) {
+        this.listener.onConnectionEstablished(this.next.getRemoteURI());
+    }
+
+    @Override
     public void onConnectionFailure(IOException ex) {
         this.listener.onConnectionInterrupted(this.next.getRemoteURI());
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index aad47f1..259948d 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -42,8 +42,8 @@ import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionInfo;
 import org.apache.qpid.jms.provider.AbstractProvider;
 import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.transports.TcpTransport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.util.IOExceptionSupport;
@@ -243,7 +243,24 @@ public class AmqpProvider extends AbstractProvider 
implements TransportListener
                                 sasl.client();
                             }
                             connection = new AmqpConnection(AmqpProvider.this, 
protonConnection, sasl, connectionInfo);
-                            connection.open(request);
+                            connection.open(new AsyncResult() {
+
+                                @Override
+                                public void onSuccess() {
+                                    fireConnectionEstablished();
+                                    request.onSuccess();
+                                }
+
+                                @Override
+                                public void onFailure(Throwable result) {
+                                    request.onFailure(result);
+                                }
+
+                                @Override
+                                public boolean isComplete() {
+                                    return request.isComplete();
+                                }
+                            });
                         }
 
                         @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 2f4db2c..ca8a5e6 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsSslContext;
-import org.apache.qpid.jms.message.JmsDefaultMessageFactory;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessageFactory;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
@@ -45,10 +44,10 @@ import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.DefaultProviderListener;
 import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFactory;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
-import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,7 +76,7 @@ public class FailoverProvider extends DefaultProviderListener 
implements Provide
     private final Map<Long, FailoverRequest> requests = new 
LinkedHashMap<Long, FailoverRequest>();
     private final DefaultProviderListener closedListener = new 
DefaultProviderListener();
     private final JmsSslContext sslContext;
-    private final JmsMessageFactory defaultMessageFactory = new 
JmsDefaultMessageFactory();
+    private final AtomicReference<JmsMessageFactory> messageFactory = new 
AtomicReference<JmsMessageFactory>();
 
     // Current state of connection / reconnection
     private boolean firstConnection = true;
@@ -205,20 +204,30 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
     @Override
     public void create(final JmsResource resource, AsyncResult request) throws 
IOException, JMSException, UnsupportedOperationException {
         checkClosed();
-        final FailoverRequest pending = new FailoverRequest(request) {
-            @Override
-            public void doTask() throws Exception {
-                if (resource instanceof JmsConnectionInfo) {
-                    JmsConnectionInfo connectionInfo = (JmsConnectionInfo) 
resource;
-                    connectTimeout = connectionInfo.getConnectTimeout();
-                    closeTimeout = connectionInfo.getCloseTimeout();
-                    sendTimeout = connectionInfo.getSendTimeout();
-                    requestTimeout = connectionInfo.getRequestTimeout();
-                }
+        FailoverRequest pending = null;
+        if (resource instanceof JmsConnectionInfo) {
+            pending = new CreateConnectionRequest(request) {
+                @Override
+                public void doTask() throws Exception {
+                    if (resource instanceof JmsConnectionInfo) {
+                        JmsConnectionInfo connectionInfo = (JmsConnectionInfo) 
resource;
+                        connectTimeout = connectionInfo.getConnectTimeout();
+                        closeTimeout = connectionInfo.getCloseTimeout();
+                        sendTimeout = connectionInfo.getSendTimeout();
+                        requestTimeout = connectionInfo.getRequestTimeout();
+                    }
 
-                provider.create(resource, this);
-            }
-        };
+                    provider.create(resource, this);
+                }
+            };
+        } else {
+            pending = new FailoverRequest(request) {
+                @Override
+                public void doTask() throws Exception {
+                    provider.create(resource, this);
+                }
+            };
+        }
 
         serializer.execute(pending);
     }
@@ -389,20 +398,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
 
     @Override
     public JmsMessageFactory getMessageFactory() {
-        final AtomicReference<JmsMessageFactory> result =
-            new AtomicReference<JmsMessageFactory>(defaultMessageFactory);
-
-        serializer.execute(new Runnable() {
-
-            @Override
-            public void run() {
-                if (provider != null) {
-                    result.set(provider.getMessageFactory());
-                }
-            }
-        });
-
-        return result.get();
+        return messageFactory.get();
     }
 
     //--------------- Connection Error and Recovery methods 
------------------//
@@ -415,6 +411,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
      * is allowed and if so a new reconnect cycle is triggered on the 
connection thread.
      *
      * @param cause
+     *        the error that triggered the failure of the provider.
      */
     private void handleProviderFailure(final IOException cause) {
         LOG.debug("handling Provider failure: {}", cause.getMessage());
@@ -445,7 +442,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
 
     /**
      * Called from the reconnection thread.  This method enqueues a new task 
that
-     * will attempt to recover connection state, once successful normal 
operations
+     * will attempt to recover connection state, once successful, normal 
operations
      * will resume.  If an error occurs while attempting to recover the JMS 
framework
      * state then a reconnect cycle is again triggered on the connection 
thread.
      *
@@ -458,10 +455,10 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
             public void run() {
                 try {
                     if (firstConnection) {
-                        firstConnection = false;
                         FailoverProvider.this.provider = provider;
                         provider.setProviderListener(FailoverProvider.this);
 
+                        // The only pending request here should be a 
Connection create request.
                         List<FailoverRequest> pending = new 
ArrayList<FailoverRequest>(requests.values());
                         for (FailoverRequest request : pending) {
                             request.run();
@@ -474,13 +471,16 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                         // Stage 1: Recovery all JMS Framework resources
                         listener.onConnectionRecovery(provider);
 
-                        // Stage 2: Restart consumers, send pull commands, etc.
+                        // Stage 2: Connection state recovered, get newly 
configured message factory.
+                        
FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
+
+                        // Stage 3: Restart consumers, send pull commands, etc.
                         listener.onConnectionRecovered(provider);
 
-                        // Stage 3: Let the client know that connection has 
restored.
+                        // Stage 4: Let the client know that connection has 
restored.
                         listener.onConnectionRestored(provider.getRemoteURI());
 
-                        // Stage 4: Send pending actions.
+                        // Stage 5: Send pending actions.
                         List<FailoverRequest> pending = new 
ArrayList<FailoverRequest>(requests.values());
                         for (FailoverRequest request : pending) {
                             request.run();
@@ -771,8 +771,6 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
      * For all requests that are dispatched from the FailoverProvider to a 
connected
      * Provider instance an instance of FailoverRequest is used to handle 
errors that
      * occur during processing of that request and trigger a reconnect.
-     *
-     * @param <T>
      */
     protected abstract class FailoverRequest extends ProviderFuture implements 
Runnable {
 
@@ -854,4 +852,43 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
             return false;
         }
     }
+
+    /**
+     * Captures the initial request to create a JmsConnectionInfo based 
resources and ensures
+     * that if the connection is successfully established that the connection 
established event
+     * is triggered once before moving on to sending only connection 
interrupted and restored
+     * events.  
+     */
+    protected abstract class CreateConnectionRequest extends FailoverRequest {
+
+        /**
+         * @param watcher
+         */
+        public CreateConnectionRequest(AsyncResult watcher) {
+            super(watcher);
+        }
+
+        @Override
+        public void onSuccess() {
+            serializer.execute(new Runnable() {
+                @Override
+                public void run() {
+                    if (firstConnection) {
+                        LOG.trace("First connection requst has completed:");
+                        
FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
+                        
listener.onConnectionEstablished(provider.getRemoteURI());
+                        firstConnection = false;
+                    } else {
+                        LOG.warn("A second call to a CreateConnectionRequest 
not expected.");
+                    }
+
+                    CreateConnectionRequest.this.signalConnected();
+                }
+            });
+        }
+
+        public void signalConnected() {
+            super.onSuccess();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6485baab/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
index 77cf6ce..4945a92 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java
@@ -42,6 +42,7 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport 
implements JmsConnecti
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JmsAmqpDiscoveryTest.class);
 
+    private CountDownLatch connected;
     private CountDownLatch interrupted;
     private CountDownLatch restored;
     private JmsConnection jmsConnection;
@@ -51,6 +52,7 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport 
implements JmsConnecti
     public void setUp() throws Exception {
         super.setUp();
 
+        connected = new CountDownLatch(1);
         interrupted = new CountDownLatch(1);
         restored = new CountDownLatch(1);
     }
@@ -148,6 +150,12 @@ public class JmsAmqpDiscoveryTest extends AmqpTestSupport 
implements JmsConnecti
     }
 
     @Override
+    public void onConnectionEstablished(URI remoteURI) {
+        LOG.info("Connection reports established.  Connected to -> {}", 
remoteURI);
+        connected.countDown();
+    }
+
+    @Override
     public void onConnectionInterrupted(URI remoteURI) {
         LOG.info("Connection reports interrupted. Lost connection to -> {}", 
remoteURI);
         interrupted.countDown();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to