Repository: qpid-proton Updated Branches: refs/heads/master 6d873ebed -> b67a2a943
PROTON-944: add ability to set a default state for use when settling/freeing received deliveries without having previously set/sent dispositon state for them Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b67a2a94 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b67a2a94 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b67a2a94 Branch: refs/heads/master Commit: b67a2a943017910bcf8bf67a05aafed93ab7b8b1 Parents: 6d873eb Author: Robert Gemmell <rob...@apache.org> Authored: Mon Jul 13 17:28:31 2015 +0100 Committer: Robert Gemmell <rob...@apache.org> Committed: Mon Jul 13 17:55:49 2015 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/proton/engine/Delivery.java | 10 + .../qpid/proton/engine/impl/DeliveryImpl.java | 14 + .../qpid/proton/engine/impl/TransportImpl.java | 14 +- .../systemtests/DefaultDeliveryStateTest.java | 288 +++++++++++++++++++ 4 files changed, 323 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b67a2a94/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java index d08efc2..71932ed 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java @@ -108,4 +108,14 @@ public interface Delivery extends Extendable public boolean isBuffered(); + /** + * Configures a default DeliveryState to be used if a + * received delivery is settled/freed without any disposition + * state having been previously applied. + * + * @param state the default delivery state + */ + public void setDefaultDeliveryState(DeliveryState state); + + public DeliveryState getDefaultDeliveryState(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b67a2a94/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java index 3136d7a..896d8a9 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java @@ -49,6 +49,7 @@ public class DeliveryImpl implements Delivery private boolean _settled; private boolean _remoteSettled; private DeliveryState _remoteDeliveryState; + private DeliveryState _defaultDeliveryState = null; private static final int DELIVERY_STATE_CHANGED = 1; private static final int ABLE_TO_SEND = 2; @@ -420,6 +421,7 @@ public class DeliveryImpl implements Delivery .append(", _remoteSettled=").append(_remoteSettled) .append(", _remoteDeliveryState=").append(_remoteDeliveryState) .append(", _flags=").append(_flags) + .append(", _defaultDeliveryState=").append(_defaultDeliveryState) .append(", _transportDelivery=").append(_transportDelivery) .append(", _dataSize=").append(_dataSize) .append(", _complete=").append(_complete) @@ -434,4 +436,16 @@ public class DeliveryImpl implements Delivery return _dataSize; } + @Override + public void setDefaultDeliveryState(DeliveryState state) + { + _defaultDeliveryState = state; + } + + @Override + public DeliveryState getDefaultDeliveryState() + { + return _defaultDeliveryState; + } + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b67a2a94/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 9cc73ce..0902a55 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -32,6 +32,7 @@ import org.apache.qpid.proton.amqp.transport.Attach; import org.apache.qpid.proton.amqp.transport.Begin; import org.apache.qpid.proton.amqp.transport.Close; import org.apache.qpid.proton.amqp.transport.ConnectionError; +import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.Detach; import org.apache.qpid.proton.amqp.transport.Disposition; import org.apache.qpid.proton.amqp.transport.End; @@ -640,15 +641,22 @@ public class TransportImpl extends EndpointImpl if (tpSession.isLocalChannelSet()) { + boolean settled = delivery.isSettled(); + DeliveryState localState = delivery.getLocalState(); + Disposition disposition = new Disposition(); disposition.setFirst(tpDelivery.getDeliveryId()); disposition.setLast(tpDelivery.getDeliveryId()); disposition.setRole(Role.RECEIVER); - disposition.setSettled(delivery.isSettled()); + disposition.setSettled(settled); + disposition.setState(localState); + + if(localState == null && settled) { + disposition.setState(delivery.getDefaultDeliveryState()); + } - disposition.setState(delivery.getLocalState()); writeFrame(tpSession.getLocalChannel(), disposition, null, null); - if (delivery.isSettled()) + if (settled) { tpDelivery.settled(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b67a2a94/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java new file mode 100644 index 0000000..3ad9d3a --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.proton.systemtests; + +import static java.util.EnumSet.of; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; +import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; +import static org.apache.qpid.proton.engine.EndpointState.CLOSED; +import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; +import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; +import static org.junit.Assert.assertNull; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.logging.Logger; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.Message; +import org.junit.Test; + + +public class DefaultDeliveryStateTest extends EngineTestBase +{ + private static final Logger LOGGER = Logger.getLogger(DefaultDeliveryStateTest.class.getName()); + + private static final int BUFFER_SIZE = 4096; + + private final String _sourceAddress = getServer().containerId + "-link1-source"; + + @Test + public void testDefaultDeliveryState() throws Exception + { + LOGGER.fine(bold("======== About to create transports")); + + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); + + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); + + doOutputInputCycle(); + + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); + + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); + + + + LOGGER.fine(bold("======== About to open connections")); + getClient().connection.open(); + getServer().connection.open(); + + doOutputInputCycle(); + + + + LOGGER.fine(bold("======== About to open sessions")); + getClient().session = getClient().connection.session(); + getClient().session.open(); + + pumpClientToServer(); + + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); + + getServer().session.open(); + assertEndpointState(getServer().session, ACTIVE, ACTIVE); + + pumpServerToClient(); + assertEndpointState(getClient().session, ACTIVE, ACTIVE); + + + LOGGER.fine(bold("======== About to create reciever")); + + getClient().source = new Source(); + getClient().source.setAddress(_sourceAddress); + + getClient().target = new Target(); + getClient().target.setAddress(null); + + getClient().receiver = getClient().session.receiver("link1"); + getClient().receiver.setTarget(getClient().target); + getClient().receiver.setSource(getClient().source); + + getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + + assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); + + getClient().receiver.open(); + assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); + + pumpClientToServer(); + + + LOGGER.fine(bold("======== About to set up implicitly created sender")); + + getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + + getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); + getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); + + org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource(); + getServer().sender.setSource(serverRemoteSource); + + assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); + getServer().sender.open(); + + assertEndpointState(getServer().sender, ACTIVE, ACTIVE); + + pumpServerToClient(); + + assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); + + int messagCount = 3; + getClient().receiver.flow(messagCount); + + pumpClientToServer(); + + LOGGER.fine(bold("======== About to create messages and send to the client")); + + Delivery serverDelivery1 = sendMessageToClient("delivery1", "Msg1"); + Delivery serverDelivery2 = sendMessageToClient("delivery2", "Msg2"); + Delivery serverDelivery3 = sendMessageToClient("delivery3", "Msg3"); + + pumpServerToClient(); + + assertNull(serverDelivery1.getLocalState()); + assertNull(serverDelivery2.getLocalState()); + assertNull(serverDelivery3.getLocalState()); + + assertNull(serverDelivery1.getRemoteState()); + assertNull(serverDelivery2.getRemoteState()); + assertNull(serverDelivery3.getRemoteState()); + + LOGGER.fine(bold("======== About to process the messages on the client")); + + Delivery clientDelivery1 = receiveMessageFromServer("delivery1", "Msg1"); + Delivery clientDelivery2 = receiveMessageFromServer("delivery2", "Msg2"); + Delivery clientDelivery3 = receiveMessageFromServer("delivery3", "Msg3"); + + // Give them some default state + clientDelivery1.setDefaultDeliveryState(Released.getInstance()); + clientDelivery2.setDefaultDeliveryState(Released.getInstance()); + clientDelivery3.setDefaultDeliveryState(Released.getInstance()); + + assertEquals(Released.getInstance(), clientDelivery1.getDefaultDeliveryState()); + assertEquals(Released.getInstance(), clientDelivery2.getDefaultDeliveryState()); + assertEquals(Released.getInstance(), clientDelivery3.getDefaultDeliveryState()); + + // Check the default state doesn't influence the actual state + assertNull(clientDelivery1.getLocalState()); + assertNull(clientDelivery2.getLocalState()); + assertNull(clientDelivery3.getLocalState()); + + assertNull(clientDelivery1.getRemoteState()); + assertNull(clientDelivery2.getRemoteState()); + assertNull(clientDelivery3.getRemoteState()); + + // Accept one for real, update default on another, leave last untouched + clientDelivery1.disposition(Accepted.getInstance()); + clientDelivery2.setDefaultDeliveryState(new Modified()); + + // Confirm default and actual states have or have not changed as expected + assertEquals(Released.getInstance(), clientDelivery1.getDefaultDeliveryState()); + assertTrue(clientDelivery2.getDefaultDeliveryState() instanceof Modified); + assertEquals(Released.getInstance(), clientDelivery3.getDefaultDeliveryState()); + + assertEquals(Accepted.getInstance(), clientDelivery1.getLocalState()); + assertNull(clientDelivery2.getLocalState()); + assertNull(clientDelivery3.getLocalState()); + + // Verify the server gets intended state changes + pumpClientToServer(); + + assertEquals(Accepted.getInstance(), serverDelivery1.getRemoteState()); + assertNull(serverDelivery2.getRemoteState()); + assertNull(serverDelivery3.getRemoteState()); + + // Confirm server sees the default states for second and third + // messages when they get settled during link free + getClient().receiver.close(); + assertEndpointState(getClient().receiver, CLOSED, ACTIVE); + + pumpClientToServer(); + + assertEndpointState(getServer().sender, ACTIVE, CLOSED); + getServer().sender.close(); + + assertEndpointState(getServer().sender, CLOSED, CLOSED); + + pumpServerToClient(); + + getClient().receiver.free(); + + assertEndpointState(getClient().receiver, CLOSED, CLOSED); + + pumpClientToServer(); + + assertEquals(Accepted.getInstance(), serverDelivery1.getRemoteState()); + assertTrue(serverDelivery2.getRemoteState() instanceof Modified); + assertEquals(Released.getInstance(), serverDelivery3.getRemoteState()); + } + + private Delivery receiveMessageFromServer(String deliveryTag, String messageContent) + { + Delivery delivery = getClient().connection.getWorkHead(); + + assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag())); + assertEquals("The received delivery should be on our receiver", + getClient().receiver, delivery.getLink()); + + assertNull(delivery.getLocalState()); + assertNull(delivery.getRemoteState()); + + assertFalse(delivery.isPartial()); + assertTrue(delivery.isReadable()); + + byte[] received = new byte[BUFFER_SIZE]; + int len = getClient().receiver.recv(received, 0, BUFFER_SIZE); + + assertTrue("given array was too small", len < BUFFER_SIZE); + + Message m = Proton.message(); + m.decode(received, 0, len); + + Object messageBody = ((AmqpValue)m.getBody()).getValue(); + assertEquals("Unexpected message content", messageContent, messageBody); + + boolean receiverAdvanced = getClient().receiver.advance(); + assertTrue("receiver has not advanced", receiverAdvanced); + + return delivery; + } + + private Delivery sendMessageToClient(String deliveryTag, String messageBody) + { + byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8); + + Message m = Proton.message(); + m.setBody(new AmqpValue(messageBody)); + + byte[] encoded = new byte[BUFFER_SIZE]; + int len = m.encode(encoded, 0, BUFFER_SIZE); + + assertTrue("given array was too small", len < BUFFER_SIZE); + + Delivery serverDelivery = getServer().sender.delivery(tag); + int sent = getServer().sender.send(encoded, 0, len); + + assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent); + + boolean senderAdvanced = getServer().sender.advance(); + assertTrue("sender has not advanced", senderAdvanced); + + return serverDelivery; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org