Repository: qpid-broker-j Updated Branches: refs/heads/master fcad8ea35 -> 37b1a71ae
QPID-7960: [Java Broker, AMQP 1.0] Add support for undeliverable-here on Modified outcome Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ac6f6c30 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ac6f6c30 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ac6f6c30 Branch: refs/heads/master Commit: ac6f6c30a7dd859ec7e38708e118f1d8dff71c7e Parents: fcad8ea Author: Alex Rudyy <oru...@apache.org> Authored: Fri Oct 6 10:55:39 2017 +0100 Committer: Alex Rudyy <oru...@apache.org> Committed: Fri Oct 6 10:55:58 2017 +0100 ---------------------------------------------------------------------- .../protocol/v1_0/ConsumerTarget_1_0.java | 8 +- .../qpid/tests/protocol/v1_0/Interaction.java | 14 +++ .../protocol/v1_0/messaging/OutcomeTest.java | 106 +++++++++++++++++++ 3 files changed, 126 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac6f6c30/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index eb6d512..094164c 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -521,9 +521,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0> @Override public void postCommit() { - // TODO: add handling of undeliverable-here + Modified modifiedOutcome = (Modified) outcome; + if (Boolean.TRUE.equals(modifiedOutcome.getUndeliverableHere())) + { + _queueEntry.reject(getConsumer()); + } - if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed())) + if(Boolean.TRUE.equals(modifiedOutcome.getDeliveryFailed())) { incrementDeliveryCountOrRouteToAlternateOrDiscard(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac6f6c30/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java index 6b59960..0b91273 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -105,6 +105,7 @@ public class Interaction private int _deliveryIdCounter; private List<Transfer> _latestDelivery; private Object _decodedLatestDelivery; + private UnsignedInteger _latestDeliveryId; Interaction(final FrameTransport frameTransport) { @@ -604,6 +605,11 @@ public class Interaction return this; } + public Interaction flowNextIncomingIdFromLatestDelivery() + { + return flowNextIncomingId(_latestDeliveryId.add(UnsignedInteger.ONE)); + } + public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow) { _flow.setOutgoingWindow(outgoingWindow); @@ -850,6 +856,12 @@ public class Interaction return this; } + public Interaction dispositionFirstFromLatestDelivery() + { + _disposition.setFirst(_latestDeliveryId); + return this; + } + public Interaction disposition() throws Exception { sendPerformativeAndChainFuture(copyDisposition(_disposition), _sessionChannel); @@ -1064,6 +1076,7 @@ public class Interaction { sync(); _latestDelivery = receiveAllTransfers(); + _latestDeliveryId = _latestDelivery.size() > 0 ? _latestDelivery.get(0).getDeliveryId() : null; return this; } @@ -1109,4 +1122,5 @@ public class Interaction { return new InteractionTransactionalState(handle); } + } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ac6f6c30/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java new file mode 100644 index 0000000..f081006 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v1_0.messaging; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.net.InetSocketAddress; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.tests.protocol.v1_0.FrameTransport; +import org.apache.qpid.tests.protocol.v1_0.Interaction; +import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class OutcomeTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + + @Test + @SpecificationTest(section = "3.4.5", description = "If the undeliverable-here is set, then any messages released" + + " MUST NOT be redelivered to the modifying link endpoint.") + public void modifiedOutcomeWithUndeliverableHere() throws Exception + { + getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message1"); + getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message2"); + + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction() + .negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.RECEIVER) + .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attach().consumeResponse(Attach.class) + .flowIncomingWindow(UnsignedInteger.ONE) + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandleFromLinkHandle() + .flow() + .receiveDelivery() + + .decodeLatestDelivery(); + + Object firstDeliveryPayload = interaction.getDecodedLatestDelivery(); + assertThat(firstDeliveryPayload, is(equalTo("message1"))); + + Modified modifiedOutcome = new Modified(); + modifiedOutcome.setUndeliverableHere(Boolean.TRUE); + interaction.dispositionSettled(true) + .dispositionRole(Role.RECEIVER) + .dispositionFirstFromLatestDelivery() + .dispositionState(modifiedOutcome) + .disposition() + .flowIncomingWindow(UnsignedInteger.valueOf(2)) + .flowLinkCredit(UnsignedInteger.valueOf(2)) + .flowNextIncomingIdFromLatestDelivery() + .flow() + .receiveDelivery() + .decodeLatestDelivery(); + ; + + Object secondDeliveryPayload = interaction.getDecodedLatestDelivery(); + assertThat(secondDeliveryPayload, is(equalTo("message2"))); + + // verify that no unexpected performative is received by closing + transport.doCloseConnection(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org