Repository: qpid-jms
Updated Branches:
  refs/heads/master 6553cfd5b -> 6e442f4c6


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 8eaf707..d6dc443 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -43,11 +43,14 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.qpid.jms.JmsCompletionListener;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsMessageProducer;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsSendTimedOutException;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
@@ -56,6 +59,7 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
 import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
@@ -1034,6 +1038,153 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 20000)
+    public void testFailoverPassthroughOfCompletedAsyncSend() throws Exception 
{
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final Connection connection = establishAnonymousConnecton(
+                
"failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer);
+
+            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectBegin();
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API 
dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) 
session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertTrue("Did not get async callback", 
listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFalioverPassthroughOfRejectedAsyncCompletionSend() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final JmsConnection connection = establishAnonymousConnecton(
+                
"failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer);
+
+            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectBegin();
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API 
dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) 
session.createProducer(queue);
+
+            Message message = session.createTextMessage("content");
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), 
nullValue(), false, new Rejected(), true);
+
+            assertNull("Should not yet have a JMSDestination", 
message.getJMSDestination());
+
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            assertTrue("Did not get async callback", 
listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            testPeer.expectClose();
+
+            listener = new TestJmsCompletionListener();
+            try {
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            assertTrue("Did not get async callback", 
listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFailoverConnectionLossFailsWaitingAsyncCompletionSends() 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final JmsConnection connection = establishAnonymousConnecton(
+                
"failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
+                testPeer);
+
+            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectBegin();
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // TODO Can change to plain MessageProducer when JMS 2.0 API 
dependency is added.
+            JmsMessageProducer producer = (JmsMessageProducer) 
session.createProducer(queue);
+
+            final int MSG_COUNT = 5;
+
+            Message message = session.createTextMessage("content");
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new 
TransferPayloadCompositeMatcher());
+            }
+
+            // Accept one which shouldn't complete until after the others have 
failed.
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), 
nullValue(), false, new Accepted(), true);
+            testPeer.dropAfterLastHandler();
+
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener(MSG_COUNT + 1);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    producer.send(message, listener);
+                }
+
+                producer.send(message, listener);
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            assertTrue("Did not get async callback", 
listener.awaitCompletion(2000, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount);
+            assertEquals(1, listener.successCount);
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) 
throws JMSException {
         return establishAnonymousConnecton(null, null, peers);
     }
@@ -1076,4 +1227,47 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
     private String createPeerURI(TestAmqpPeer peer, String params) {
         return "amqp://localhost:" + peer.getServerPort() + (params != null ? 
"?" + params : "");
     }
+
+    private class TestJmsCompletionListener implements JmsCompletionListener {
+
+        private final CountDownLatch completed;
+
+        public volatile int successCount;
+        public volatile int errorCount;
+
+        public volatile Message message;
+        public volatile Exception exception;
+
+        public TestJmsCompletionListener() {
+            this(1);
+        }
+
+        public TestJmsCompletionListener(int expected) {
+            this.completed = new CountDownLatch(expected);
+        }
+
+        public boolean awaitCompletion(long timeout, TimeUnit units) throws 
InterruptedException {
+            return completed.await(timeout, units);
+        }
+
+        @Override
+        public void onCompletion(Message message) {
+            LOG.info("JmsCompletionListener onCompletion called with message: 
{}", message);
+            this.message = message;
+            this.successCount++;
+
+            completed.countDown();
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+            LOG.info("JmsCompletionListener onException called with message: 
{} error {}", message, exception);
+
+            this.message = message;
+            this.exception = exception;
+            this.errorCount++;
+
+            completed.countDown();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
index 210ce2c..35d305c 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
@@ -274,7 +274,17 @@ public class MockProvider implements Provider {
                 try {
                     checkClosed();
                     stats.recordSendCall();
+
                     request.onSuccess();
+                    if (envelope.isCompletionRequired()) {
+                        if (configuration.isDelayCompletionCalls()) {
+                            context.recordPendingCompletion(MockProvider.this, 
envelope);
+                        } else {
+                            if (listener != null) {
+                                listener.onCompletedMessageSend(envelope);
+                            }
+                        }
+                    }
                 } catch (Exception error) {
                     request.onFailure(error);
                 }
@@ -422,7 +432,6 @@ public class MockProvider implements Provider {
         });
     }
 
-
     /**
      * Switch state to closed without sending any notifications
      */
@@ -489,7 +498,6 @@ public class MockProvider implements Provider {
 
     //----- Implementation details 
-------------------------------------------//
 
-
     private void checkClosed() throws ProviderClosedException {
         if (closed.get()) {
             throw new ProviderClosedException("This Provider is already 
closed");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
index 7c78fff..d8c9019 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderConfiguration.java
@@ -25,6 +25,8 @@ public class MockProviderConfiguration {
     private boolean failOnStart;
     private boolean failOnClose;
 
+    private boolean delayCompletionCalls;
+
     public boolean isFailOnConnect() {
         return failOnConnect;
     }
@@ -48,4 +50,12 @@ public class MockProviderConfiguration {
     public void setFailOnClose(boolean value) {
         this.failOnClose = value;
     }
+
+    public boolean isDelayCompletionCalls() {
+        return delayCompletionCalls;
+    }
+
+    public void setDelayCompletionCalls(boolean delayCompletionCalls) {
+        this.delayCompletionCalls = delayCompletionCalls;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
index 99fbfbc..a964282 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockRemotePeer.java
@@ -18,10 +18,17 @@ package org.apache.qpid.jms.provider.mock;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsResource;
 
 /**
@@ -42,6 +49,9 @@ public class MockRemotePeer {
     private ResourceLifecycleFilter stopFilter;
     private ResourceLifecycleFilter destroyFilter;
 
+    private final Map<Destination, List<PendingCompletion>> pendingCompletions 
=
+        new ConcurrentHashMap<Destination, List<PendingCompletion>>();
+
     public void connect(MockProvider provider) throws IOException {
         if (offline) {
             throw new IOException();
@@ -146,4 +156,117 @@ public class MockRemotePeer {
     public void setResourceDestroyFilter(ResourceLifecycleFilter filter) {
         destroyFilter = filter;
     }
+
+    //----- Controls handling of Message Send Completions 
--------------------//
+
+    public void recordPendingCompletion(MockProvider provider, 
JmsOutboundMessageDispatch envelope) {
+        Destination destination = envelope.getDestination();
+        if (!pendingCompletions.containsKey(destination)) {
+            pendingCompletions.put(destination, new 
ArrayList<PendingCompletion>());
+        }
+
+        pendingCompletions.get(destination).add(new 
PendingCompletion(provider, envelope));
+    }
+
+    public void completeAllPendingSends(Destination destination) {
+        if (pendingCompletions.containsKey(destination)) {
+
+            for (List<PendingCompletion> pendingSends : 
pendingCompletions.values()) {
+                for (PendingCompletion pending : pendingSends) {
+                    
pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope);
+                }
+            }
+
+            pendingCompletions.remove(destination);
+        }
+    }
+
+    public void failAllPendingSends(Destination destination, Exception error) {
+        if (pendingCompletions.containsKey(destination)) {
+
+            for (List<PendingCompletion> pendingSends : 
pendingCompletions.values()) {
+                for (PendingCompletion pending : pendingSends) {
+                    
pending.provider.getProviderListener().onFailedMessageSend(pending.envelope, 
error);
+                }
+            }
+
+            pendingCompletions.remove(destination);
+        }
+    }
+
+    public void completePendingSend(Message message) throws JMSException {
+        List<PendingCompletion> pendingSends = 
pendingCompletions.get(message.getJMSDestination());
+        Iterator<PendingCompletion> iterator = pendingSends.iterator();
+        while (iterator.hasNext()) {
+            PendingCompletion pending = iterator.next();
+            if 
(pending.envelope.getMessage().getJMSMessageID().equals(message.getJMSMessageID()))
 {
+                
pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope);
+                iterator.remove();
+            }
+        }
+    }
+
+    public void completePendingSend(JmsOutboundMessageDispatch envelope) 
throws JMSException {
+        List<PendingCompletion> pendingSends = 
pendingCompletions.get(envelope.getDestination());
+        Iterator<PendingCompletion> iterator = pendingSends.iterator();
+        while (iterator.hasNext()) {
+            PendingCompletion pending = iterator.next();
+            if 
(pending.envelope.getMessage().getJMSMessageID().equals(envelope.getMessage().getJMSMessageID()))
 {
+                
pending.provider.getProviderListener().onCompletedMessageSend(pending.envelope);
+                iterator.remove();
+            }
+        }
+    }
+
+    public void failPendingSend(Message message, Exception error) throws 
JMSException {
+        List<PendingCompletion> pendingSends = 
pendingCompletions.get(message.getJMSDestination());
+        Iterator<PendingCompletion> iterator = pendingSends.iterator();
+        while (iterator.hasNext()) {
+            PendingCompletion pending = iterator.next();
+            if 
(pending.envelope.getMessage().getJMSMessageID().equals(message.getJMSMessageID()))
 {
+                
pending.provider.getProviderListener().onFailedMessageSend(pending.envelope, 
error);
+                iterator.remove();
+            }
+        }
+    }
+
+    public void failPendingSend(JmsOutboundMessageDispatch envelope, Exception 
error) throws JMSException {
+        List<PendingCompletion> pendingSends = 
pendingCompletions.get(envelope.getDestination());
+        Iterator<PendingCompletion> iterator = pendingSends.iterator();
+        while (iterator.hasNext()) {
+            PendingCompletion pending = iterator.next();
+            if 
(pending.envelope.getMessage().getJMSMessageID().equals(envelope.getMessage().getJMSMessageID()))
 {
+                
pending.provider.getProviderListener().onFailedMessageSend(pending.envelope, 
error);
+                iterator.remove();
+            }
+        }
+    }
+
+    public List<JmsOutboundMessageDispatch> getPendingCompletions(Destination 
destination) {
+        List<JmsOutboundMessageDispatch> result = null;
+
+        if (pendingCompletions.containsKey(destination)) {
+            result = new ArrayList<JmsOutboundMessageDispatch>();
+            List<PendingCompletion> pendingMessages = 
pendingCompletions.get(destination);
+            for (PendingCompletion pending : pendingMessages) {
+                result.add(pending.envelope);
+            }
+        } else {
+            result = Collections.emptyList();
+        }
+
+        return result;
+    }
+
+    private class PendingCompletion {
+
+        public final MockProvider provider;
+        public final JmsOutboundMessageDispatch envelope;
+
+        public PendingCompletion(MockProvider provider, 
JmsOutboundMessageDispatch envelope) {
+            this.provider = provider;
+            this.envelope = envelope;
+        }
+
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to