Repository: qpid-jms
Updated Branches:
  refs/heads/master d1f0b32ba -> f29381d98


Ensure that pending requests also fail when connection is lost otherwise
they can hang.  Added some tests that showed this intermittently
happening.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f29381d9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f29381d9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f29381d9

Branch: refs/heads/master
Commit: f29381d9869865ff0a5c4e0f1fd4ece61196f604
Parents: d1f0b32
Author: Timothy Bish <tabish...@gmail.com>
Authored: Tue Mar 3 11:00:54 2015 -0500
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Tue Mar 3 11:01:36 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  35 ++---
 .../org/apache/qpid/jms/JmsConnectionTest.java  |  27 ++++
 .../jms/consumer/JmsMessageConsumerTest.java    | 136 +++++++++++++++++++
 .../apache/qpid/jms/session/JmsSessionTest.java |  33 +++++
 .../qpid/jms/support/AmqpTestSupport.java       |  14 ++
 5 files changed, 229 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 0face7b..71e00d3 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -48,6 +48,7 @@ import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.NoOpAsyncResult;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
@@ -92,6 +93,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
     //       brokers that don't currently handle the unsigned range well.
     private static final int DEFAULT_CHANNEL_MAX = 32767;
     private static final AtomicInteger PROVIDER_SEQUENCE = new AtomicInteger();
+    private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
 
     private ProviderListener listener;
     private AmqpConnection connection;
@@ -177,7 +179,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
 
                         if (connection != null) {
                             connection.close(request);
-                            pumpToProtonTransport();
+                            pumpToProtonTransport(request);
                         } else {
                             request.onSuccess();
                         }
@@ -294,7 +296,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                         }
                     });
 
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -321,7 +323,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                         }
                     });
 
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -348,7 +350,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                         }
                     });
 
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -404,7 +406,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                         }
                     });
 
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -433,7 +435,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                     }
 
                     boolean couldSend = producer.send(envelope, request);
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                     if (couldSend && envelope.isSendAsync()) {
                         request.onSuccess();
                     }
@@ -455,7 +457,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                     checkClosed();
                     AmqpSession amqpSession = connection.getSession(sessionId);
                     amqpSession.acknowledge();
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                     request.onSuccess();
                 } catch (Exception error) {
                     request.onFailure(error);
@@ -488,9 +490,9 @@ public class AmqpProvider implements Provider, 
TransportListener {
 
                     if (consumer.getSession().isAsyncAck()) {
                         request.onSuccess();
-                        pumpToProtonTransport();
+                        pumpToProtonTransport(request);
                     } else {
-                        pumpToProtonTransport();
+                        pumpToProtonTransport(request);
                         request.onSuccess();
                     }
                 } catch (Exception error) {
@@ -511,7 +513,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                     checkClosed();
                     AmqpSession session = connection.getSession(sessionId);
                     session.commit(request);
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -530,7 +532,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                     checkClosed();
                     AmqpSession session = connection.getSession(sessionId);
                     session.rollback(request);
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -549,7 +551,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                     checkClosed();
                     AmqpSession session = connection.getSession(sessionId);
                     session.recover();
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                     request.onSuccess();
                 } catch (Exception error) {
                     request.onFailure(error);
@@ -568,7 +570,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                 try {
                     checkClosed();
                     connection.unsubscribe(subscription, request);
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -595,7 +597,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                     }
 
                     consumer.pull(timeout);
-                    pumpToProtonTransport();
+                    pumpToProtonTransport(request);
                     request.onSuccess();
                 } catch (Exception error) {
                     request.onFailure(error);
@@ -651,7 +653,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
                 // Process the state changes from the latest data and then 
answer back
                 // any pending updates to the Broker.
                 processUpdates();
-                pumpToProtonTransport();
+                pumpToProtonTransport(NOOP_REQUEST);
             }
         });
     }
@@ -764,7 +766,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
         }
     }
 
-    private void pumpToProtonTransport() {
+    private void pumpToProtonTransport(AsyncResult request) {
         try {
             boolean done = false;
             while (!done) {
@@ -785,6 +787,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
             }
         } catch (IOException e) {
             fireProviderException(e);
+            request.onFailure(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 2adcfcb..32a7c9c 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -23,9 +23,13 @@ import static org.junit.Assert.fail;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.DeliveryMode;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.qpid.jms.support.AmqpTestSupport;
@@ -122,6 +126,29 @@ public class JmsConnectionTest extends AmqpTestSupport {
         connection.start();
     }
 
+    @Test(timeout=30000)
+    public void testBrokerStopWontHangConnectionClose() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        connection.start();
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        Message m = session.createTextMessage("Sample text");
+        producer.send(m);
+
+        stopPrimaryBroker();
+
+        try {
+            connection.close();
+        } catch (Exception ex) {
+            LOG.error("Should not thrown on disconnected connection close(): 
{}", ex);
+            fail("Should not have thrown an exception.");
+        }
+    }
+
     @Test(timeout=60000)
     public void testConnectionExceptionBrokerStop() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
index b514b08..d2a3b4d 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
@@ -469,4 +470,139 @@ public class JmsMessageConsumerTest extends 
AmqpTestSupport {
         Queue queue = session.createQueue(name.getMethodName());
         session.createConsumer(queue, "3+5");
     }
+
+    @Test(timeout=30000)
+    public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws 
Exception {
+        final CountDownLatch consumerReady = new CountDownLatch(1);
+        final CountDownLatch connectionFailed = new CountDownLatch(1);
+
+        connection = createAmqpConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                LOG.info("Connection to Broker stopped");
+                connectionFailed.countDown();
+            }
+        });
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        connection.start();
+
+        final MessageConsumer consumer=session.createConsumer(queue);
+        Testable test = new Testable() {
+
+            @Override
+            public synchronized void run() {
+                try {
+                    consumerReady.countDown();
+                    assertTrue("Broker connection needs to fail.", 
connectionFailed.await(20, TimeUnit.SECONDS));
+
+                    // Might not propagate state right away, so check a few 
times.
+                    for (int i = 0; i < 250; i++) {
+                        consumer.receiveNoWait();
+                        TimeUnit.MILLISECONDS.sleep(3);
+                    }
+
+                    failure = "Should have thrown an IllegalStateException";
+                } catch (Exception ex) {
+                    LOG.info("Caught exception on receiveNoWait: {}", ex);
+                }
+            }
+        };
+
+        new Thread(test).start();
+        assertTrue(consumerReady.await(20, TimeUnit.SECONDS));
+
+        stopPrimaryBroker();
+
+        assertTrue("Consumer did not fail as expected", test.passed());
+    }
+
+    @Test(timeout=30000)
+    public void testConsumerReceiveTimedReturnsIfConnectionLost() throws 
Exception {
+        final CountDownLatch consumerReady = new CountDownLatch(1);
+
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        connection.start();
+
+        final MessageConsumer consumer=session.createConsumer(queue);
+
+        Testable test = new Testable() {
+            @Override
+            public synchronized void run() {
+                try {
+                    consumer.receive(1);
+                    new Thread(new Runnable() {
+
+                        @Override
+                        public void run() {
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(2);
+                            } catch (InterruptedException e) {
+                            }
+                            consumerReady.countDown();
+                        }
+                    }).start();
+                    consumer.receive(TimeUnit.SECONDS.toMillis(30));
+                } catch (Exception ex) {
+                    LOG.info("Caught exception on receive(): {}", ex);
+                    failure = "Should not have thrown: " + ex.getMessage();
+                }
+            }
+        };
+
+        new Thread(test).start();
+        assertTrue(consumerReady.await(20, TimeUnit.SECONDS));
+
+        stopPrimaryBroker();
+
+        assertTrue(test.passed());
+    }
+
+    @Test(timeout=30000)
+    public void testConsumerReceiveReturnsIfConnectionLost() throws Exception {
+        final CountDownLatch consumerReady = new CountDownLatch(1);
+
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        connection.start();
+
+        final MessageConsumer consumer=session.createConsumer(queue);
+
+        Testable test = new Testable() {
+            @Override
+            public synchronized void run() {
+                try {
+                    consumer.receive(1);
+                    new Thread(new Runnable() {
+
+                        @Override
+                        public void run() {
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(2);
+                            } catch (InterruptedException e) {
+                            }
+                            consumerReady.countDown();
+                        }
+                    }).start();
+                    consumer.receive();
+                } catch (Exception ex) {
+                    LOG.info("Caught exception on receive(): {}", ex);
+                    failure = "Should not have thrown: " + ex.getMessage();
+                }
+            }
+        };
+
+        new Thread(test).start();
+        assertTrue(consumerReady.await(20, TimeUnit.SECONDS));
+
+        stopPrimaryBroker();
+
+        assertTrue(test.passed());
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java
index 63d0636..e164be8 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.session;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -79,4 +80,36 @@ public class JmsSessionTest extends AmqpTestSupport {
         session.close();
         session.close();
     }
+
+    @Test(timeout=30000)
+    public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        connection.start();
+
+        stopPrimaryBroker();
+        try {
+            session.createConsumer(queue);
+            fail("Should have thrown an IllegalStateException");
+        } catch (Exception ex) {
+            LOG.info("Caught exception on create consumer: {}", ex);
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
+        connection = createAmqpConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        connection.start();
+
+        stopPrimaryBroker();
+        try {
+            session.createProducer(queue);
+            fail("Should have thrown an IllegalStateException");
+        } catch (Exception ex) {
+            LOG.info("Caught exception on create producer: {}", ex);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
index 97b3fb5..5161b0d 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java
@@ -141,4 +141,18 @@ public class AmqpTestSupport extends QpidJmsTestSupport {
         }
         return factory;
     }
+
+    public abstract class Testable implements Runnable {
+
+        protected String failure;
+
+        @Override
+        public String toString() {
+            return failure;
+        }
+
+        public synchronized boolean passed() {
+            return failure == null;
+        }
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to