Repository: qpid-broker-j
Updated Branches:
  refs/heads/master fcad8ea35 -> 37b1a71ae


QPID-7960: [Java Broker, AMQP 1.0] Add support for undeliverable-here on 
Modified outcome


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ac6f6c30
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ac6f6c30
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ac6f6c30

Branch: refs/heads/master
Commit: ac6f6c30a7dd859ec7e38708e118f1d8dff71c7e
Parents: fcad8ea
Author: Alex Rudyy <oru...@apache.org>
Authored: Fri Oct 6 10:55:39 2017 +0100
Committer: Alex Rudyy <oru...@apache.org>
Committed: Fri Oct 6 10:55:58 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/ConsumerTarget_1_0.java       |   8 +-
 .../qpid/tests/protocol/v1_0/Interaction.java   |  14 +++
 .../protocol/v1_0/messaging/OutcomeTest.java    | 106 +++++++++++++++++++
 3 files changed, 126 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac6f6c30/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index eb6d512..094164c 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -521,9 +521,13 @@ class ConsumerTarget_1_0 extends 
AbstractConsumerTarget<ConsumerTarget_1_0>
                     @Override
                     public void postCommit()
                     {
-                        // TODO: add handling of undeliverable-here
+                        Modified modifiedOutcome = (Modified) outcome;
+                        if 
(Boolean.TRUE.equals(modifiedOutcome.getUndeliverableHere()))
+                        {
+                            _queueEntry.reject(getConsumer());
+                        }
 
-                        
if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed()))
+                        
if(Boolean.TRUE.equals(modifiedOutcome.getDeliveryFailed()))
                         {
                             
incrementDeliveryCountOrRouteToAlternateOrDiscard();
                         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac6f6c30/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 6b59960..0b91273 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -105,6 +105,7 @@ public class Interaction
     private int _deliveryIdCounter;
     private List<Transfer> _latestDelivery;
     private Object _decodedLatestDelivery;
+    private UnsignedInteger _latestDeliveryId;
 
     Interaction(final FrameTransport frameTransport)
     {
@@ -604,6 +605,11 @@ public class Interaction
         return this;
     }
 
+    public Interaction flowNextIncomingIdFromLatestDelivery()
+    {
+        return flowNextIncomingId(_latestDeliveryId.add(UnsignedInteger.ONE));
+    }
+
     public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow)
     {
         _flow.setOutgoingWindow(outgoingWindow);
@@ -850,6 +856,12 @@ public class Interaction
         return this;
     }
 
+    public Interaction dispositionFirstFromLatestDelivery()
+    {
+        _disposition.setFirst(_latestDeliveryId);
+        return this;
+    }
+
     public Interaction disposition() throws Exception
     {
         sendPerformativeAndChainFuture(copyDisposition(_disposition), 
_sessionChannel);
@@ -1064,6 +1076,7 @@ public class Interaction
     {
         sync();
         _latestDelivery = receiveAllTransfers();
+        _latestDeliveryId = _latestDelivery.size() > 0 ? 
_latestDelivery.get(0).getDeliveryId() : null;
         return this;
     }
 
@@ -1109,4 +1122,5 @@ public class Interaction
     {
         return new InteractionTransactionalState(handle);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac6f6c30/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
new file mode 100644
index 0000000..f081006
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.messaging;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class OutcomeTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+
+    @Test
+    @SpecificationTest(section = "3.4.5", description = "If the 
undeliverable-here is set, then any messages released"
+                                                        + " MUST NOT be 
redelivered to the modifying link endpoint.")
+    public void modifiedOutcomeWithUndeliverableHere() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, 
"message1");
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, 
"message2");
+
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     
.negotiateProtocol().consumeResponse()
+                                                     
.open().consumeResponse(Open.class)
+                                                     
.begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     
.attach().consumeResponse(Attach.class)
+                                                     
.flowIncomingWindow(UnsignedInteger.ONE)
+                                                     
.flowLinkCredit(UnsignedInteger.ONE)
+                                                     
.flowHandleFromLinkHandle()
+                                                     .flow()
+                                                     .receiveDelivery()
+
+                                                     .decodeLatestDelivery();
+
+            Object firstDeliveryPayload = 
interaction.getDecodedLatestDelivery();
+            assertThat(firstDeliveryPayload, is(equalTo("message1")));
+
+            Modified modifiedOutcome = new Modified();
+            modifiedOutcome.setUndeliverableHere(Boolean.TRUE);
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionFirstFromLatestDelivery()
+                       .dispositionState(modifiedOutcome)
+                       .disposition()
+                       .flowIncomingWindow(UnsignedInteger.valueOf(2))
+                       .flowLinkCredit(UnsignedInteger.valueOf(2))
+                       .flowNextIncomingIdFromLatestDelivery()
+                       .flow()
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+            ;
+
+            Object secondDeliveryPayload = 
interaction.getDecodedLatestDelivery();
+            assertThat(secondDeliveryPayload, is(equalTo("message2")));
+
+            // verify that no unexpected performative is received by closing
+            transport.doCloseConnection();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to