set the outcomes for consumers souces, set default outcome to modified with 
failed=true and undeliverable-here=false. Work towards support for settling the 
deliveries prior to commit/rollback on transacted sessions.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/381f2397
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/381f2397
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/381f2397

Branch: refs/heads/master
Commit: 381f2397e8365e53c3eef45fe7cd71f591f730ac
Parents: 9b1f56e
Author: Robert Gemmell <[email protected]>
Authored: Wed Nov 19 16:52:54 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Wed Nov 19 16:52:54 2014 +0000

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 12 +++++++
 .../jms/integration/SessionIntegrationTest.java | 36 ++++++++++++++++++++
 2 files changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/381f2397/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 1a9be73..e33fddf 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -42,6 +42,8 @@ import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
@@ -139,6 +141,8 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     protected void configureSource(Source source) {
         Map<Symbol, DescribedType> filters = new HashMap<Symbol, 
DescribedType>();
+        Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, 
Rejected.DESCRIPTOR_SYMBOL,
+                                         Released.DESCRIPTOR_SYMBOL, 
Modified.DESCRIPTOR_SYMBOL};
 
         if (resource.getSubscriptionName() != null && 
!resource.getSubscriptionName().isEmpty()) {
             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
@@ -149,6 +153,14 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
         }
 
+        source.setOutcomes(outcomes);
+
+        Modified modified = new Modified();
+        modified.setDeliveryFailed(true);
+        modified.setUndeliverableHere(false);
+
+        source.setDefaultOutcome(modified);
+
         if (resource.isNoLocal()) {
             filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/381f2397/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 3ba8801..0f4afae 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.qpid.jms.integration;
 
+import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -48,10 +50,14 @@ import 
org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
 import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
 import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
@@ -515,4 +521,34 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout=5000)
+    public void 
testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws 
Exception {
+        try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin(true);
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            SourceMatcher sourceMatcher = new SourceMatcher();
+            sourceMatcher.withAddress(equalTo("queue://" + queueName));
+            sourceMatcher.withDynamic(equalTo(false));
+            
sourceMatcher.withOutcomes(arrayContaining(Accepted.DESCRIPTOR_SYMBOL, 
Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, 
Modified.DESCRIPTOR_SYMBOL));
+            ModifiedMatcher outcomeMatcher = new 
ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(equalTo(false));
+            sourceMatcher.withDefaultOutcome(outcomeMatcher);
+
+            testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
+            testPeer.expectLinkFlow();
+
+            session.createConsumer(queue);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to