Author: kwall
Date: Mon Nov  7 11:04:22 2011
New Revision: 1198701

URL: http://svn.apache.org/viewvc?rev=1198701&view=rev
Log:
QPID-3536: 0-10 overrides JMS AcceptMode with a defaulted (not explicitly set) 
Link Reliability of UNRELIABLE

Applied patch from Andrew MacBean <andymacb...@gmail.com> and myself.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1198701&r1=1198700&r2=1198701&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Mon Nov  7 11:04:22 2011
@@ -1166,22 +1166,6 @@ public class AMQSession_0_10 extends AMQ
             
             int type = resolveAddressType(dest);
             
-            if (type == AMQDestination.QUEUE_TYPE && 
-                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
-            {
-                dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
-            }
-            else if (type == AMQDestination.TOPIC_TYPE && 
-                    dest.getLink().getReliability() == Reliability.UNSPECIFIED)
-            {
-                dest.getLink().setReliability(Reliability.UNRELIABLE);
-            }
-            else if (type == AMQDestination.TOPIC_TYPE && 
-                    dest.getLink().getReliability() == 
Reliability.AT_LEAST_ONCE)
-            {
-                throw new AMQException("AT-LEAST-ONCE is not yet supported for 
Topics");                      
-            }
-            
             switch (type)
             {
                 case AMQDestination.QUEUE_TYPE: 

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1198701&r1=1198700&r2=1198701&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Mon Nov  7 11:04:22 2011
@@ -204,7 +204,7 @@ public class BasicMessageConsumer_0_10 e
     private boolean checkPreConditions(AbstractJMSMessage message) throws 
AMQException
     {
         boolean messageOk = true;
-        // TODO Use a tag for fiding out if message filtering is done here or 
by the broker.
+        // TODO Use a tag for finding out if message filtering is done here or 
by the broker.
         try
         {
             if (_messageSelectorFilter != null)

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1198701&r1=1198700&r2=1198701&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
 Mon Nov  7 11:04:22 2011
@@ -20,18 +20,14 @@
  */
 package org.apache.qpid.client.messaging.address;
 
-import static 
org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
-
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-
 public class Link
 { 
     public enum FilterType { SQL92, XQUERY, SUBJECT }
     
-    public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, 
EXACTLY_ONCE, UNSPECIFIED }
+    public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, 
EXACTLY_ONCE }
     
     protected String name;
     protected String _filter;
@@ -42,7 +38,7 @@ public class Link
     protected int _producerCapacity = 0;
     protected Node node;
     protected Subscription subscription;
-    protected Reliability reliability = UNSPECIFIED;
+    protected Reliability reliability = Reliability.AT_LEAST_ONCE;
     
     public Reliability getReliability()
     {

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1198701&r1=1198700&r2=1198701&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
 Mon Nov  7 11:04:22 2011
@@ -1070,19 +1070,6 @@ public class AddressBasedDestinationTest
         {
             assertTrue(e.getCause().getMessage().contains("The reliability 
mode 'exactly-once' is not yet supported"));
         }
-        
-        String addr4 = "ADDR:amq.topic/test;{link : {reliability : 
at-least-once}}";        
-        try
-        {
-            AMQAnyDestination dest = new AMQAnyDestination(addr4);
-            Session ssn = 
_connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-            MessageConsumer cons = ssn.createConsumer(dest);
-            fail("An exception should be thrown indicating it's an unsupported 
combination");
-        }
-        catch(Exception e)
-        {
-            assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is 
not yet supported for Topics"));
-        }
     }
     
     private void acceptModeTest(String address, int expectedQueueDepth) throws 
Exception
@@ -1286,4 +1273,52 @@ public class AddressBasedDestinationTest
         Message m = consumer.receive(RECEIVE_TIMEOUT);
         assertNull("Unexpected message received", m);
     }
+
+    /**
+     * Tests that a client using a session in {@link 
Session#CLIENT_ACKNOWLEDGE} can correctly
+     * recover a session and re-receive the same message.
+     */
+    public void testTopicRereceiveAfterRecover() throws Exception
+    {
+        final Session jmsSession = 
_connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+        final Destination topic = 
jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}");
+
+        final MessageProducer prod = jmsSession.createProducer(topic);
+        final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic);
+        final Message sentMessage = jmsSession.createTextMessage("Hello");
+
+        prod.send(sentMessage);
+        Message receivedMessage = consForTopic1.receive(1000);
+        assertNotNull("message should be received by consumer", 
receivedMessage);
+
+        jmsSession.recover();
+        receivedMessage = consForTopic1.receive(1000);
+        assertNotNull("message should be re-received by consumer after 
recover", receivedMessage);
+        receivedMessage.acknowledge();
+    }
+
+    /**
+    * Tests that a client using a session in {@link 
Session#SESSION_TRANSACTED} can correctly
+    * rollback a session and re-receive the same message.
+    */
+    public void testTopicRereceiveAfterRollback() throws Exception
+    {
+        final Session jmsSession = 
_connection.createSession(true,Session.SESSION_TRANSACTED);
+        final Destination topic = 
jmsSession.createTopic("ADDR:amq.topic/topic1; {link:{name: queue1}}");
+
+        final MessageProducer prod = jmsSession.createProducer(topic);
+        final MessageConsumer consForTopic1 = jmsSession.createConsumer(topic);
+        final Message sentMessage = jmsSession.createTextMessage("Hello");
+
+        prod.send(sentMessage);
+        jmsSession.commit();
+
+        Message receivedMessage = consForTopic1.receive(1000);
+        assertNotNull("message should be received by consumer", 
receivedMessage);
+
+        jmsSession.rollback();
+        receivedMessage = consForTopic1.receive(1000);
+        assertNotNull("message should be re-received by consumer after 
rollback", receivedMessage);
+        jmsSession.commit();
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to