http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java deleted file mode 100644 index d06ec0a..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.io.IOException; -import java.nio.channels.Pipe; -import java.nio.channels.Pipe.SinkChannel; -import java.nio.channels.Pipe.SourceChannel; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.HashMap; -import java.util.Map.Entry; - -import junit.framework.AssertionFailedError; - -// Extends the Reactor to substitute a unit-test implementation of the -// IO class. This detects, and reports, situations where the reactor code -// fails to close one of the Java I/O related resources that it has created. -public class LeakTestReactor extends ReactorImpl { - - private static class TestIO implements IO { - - private final HashMap<Object, Exception> resources = new HashMap<Object, Exception>(); - - @Override - public Pipe pipe() throws IOException { - Pipe pipe = Pipe.open(); - resources.put(pipe.source(), new Exception()); - resources.put(pipe.sink(), new Exception()); - return pipe; - } - - @Override - public Selector selector() throws IOException { - Selector selector = Selector.open(); - resources.put(selector, new Exception()); - return selector; - - } - - @Override - public ServerSocketChannel serverSocketChannel() throws IOException { - ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); - resources.put(serverSocketChannel, new Exception()); - return serverSocketChannel; - } - - @Override - public SocketChannel socketChannel() throws IOException { - SocketChannel socketChannel = SocketChannel.open(); - resources.put(socketChannel, new Exception()); - return socketChannel; - } - - private boolean isOpen(Object resource) { - if (resource instanceof SourceChannel) { - return ((SourceChannel)resource).isOpen(); - } else if (resource instanceof SinkChannel) { - return ((SinkChannel)resource).isOpen(); - } else if (resource instanceof Selector) { - return ((Selector)resource).isOpen(); - } else if (resource instanceof ServerSocketChannel) { - return ((ServerSocketChannel)resource).isOpen(); - } else if (resource instanceof SocketChannel) { - return ((SocketChannel)resource).isOpen(); - } else { - throw new AssertionFailedError("Don't know how to check if this type is open: " + resource.getClass()); - } - } - - protected void assertNoLeaks() throws AssertionFailedError { - boolean fail = false; - for (Entry<Object, Exception> entry : resources.entrySet()) { - if (isOpen(entry.getKey())) { - System.out.println("Leaked an instance of '" + entry.getKey() + "' from:"); - entry.getValue().printStackTrace(System.out); - fail = true; - } - } - if (fail) { - throw new AssertionFailedError("Resources leaked"); - } - resources.clear(); - } - } - - private final TestIO testIO; - - public LeakTestReactor() throws IOException { - super(new TestIO()); - testIO = (TestIO)getIO(); - } - - public void assertNoLeaks() throws AssertionFailedError { - testIO.assertNoLeaks(); - } - -}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/BinaryFormatter.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/BinaryFormatter.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/BinaryFormatter.java deleted file mode 100644 index 4f87626..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/BinaryFormatter.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -public class BinaryFormatter -{ - - public String format(byte[] binaryData) - { - StringBuilder stringBuilder = new StringBuilder(); - for(int i = 0; i < binaryData.length; i++) - { - byte theByte = binaryData[i]; - String formattedByte = formatByte(theByte); - stringBuilder.append(formattedByte); - } - return stringBuilder.toString(); - } - - private String formatByte(byte theByte) - { - final String retVal; - if(Character.isLetterOrDigit(theByte)) - { - retVal = String.format("[ %c ]", theByte); - } - else - { - retVal = String.format("[x%02x]", theByte); - } - return retVal; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/BinaryFormatterTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/BinaryFormatterTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/BinaryFormatterTest.java deleted file mode 100644 index 6e4567b..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/BinaryFormatterTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import static org.junit.Assert.*; - -import org.junit.Test; - -public class BinaryFormatterTest -{ - - private BinaryFormatter _binaryFormatter = new BinaryFormatter(); - - @Test - public void testSingleCharacter() - { - assertEquals("[ A ]", _binaryFormatter.format("A".getBytes())); - } - - @Test - public void testSingleSmallNonCharacter() - { - byte[] bytes = new byte[] { (byte)0x1 }; - assertEquals("[x01]", _binaryFormatter.format(bytes)); - } - - @Test - public void testSingleLargeNonCharacter() - { - int numberToUse = 0xa2; - byte byteToUse = (byte)numberToUse; - byte[] bytes = new byte[] { byteToUse }; - assertEquals("[xa2]", _binaryFormatter.format(bytes)); - } - - @Test - public void testComplex() - { - byte[] binaryData = new byte[4]; - System.arraycopy("ABC".getBytes(), 0, binaryData, 0, 3); - binaryData[3] = (byte)0xff; - - String formattedString = _binaryFormatter.format(binaryData); - String expected = "[ A ][ B ][ C ][xff]"; - assertEquals(expected, formattedString); - } - - public void testFormatSubArray() - { - fail("TODO"); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java deleted file mode 100644 index 198123d..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import static java.util.EnumSet.of; -import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; -import static org.apache.qpid.proton.engine.EndpointState.CLOSED; -import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; -import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.logging.Logger; - -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.amqp.messaging.Accepted; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.Modified; -import org.apache.qpid.proton.amqp.messaging.Released; -import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.message.Message; -import org.junit.Test; - -public class DefaultDeliveryStateTest extends EngineTestBase -{ - private static final Logger LOGGER = Logger.getLogger(DefaultDeliveryStateTest.class.getName()); - - private static final int BUFFER_SIZE = 4096; - - private final String _sourceAddress = getServer().containerId + "-link1-source"; - - @Test - public void testDefaultDeliveryState() throws Exception - { - LOGGER.fine(bold("======== About to create transports")); - - getClient().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); - - getServer().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); - - doOutputInputCycle(); - - getClient().connection = Proton.connection(); - getClient().transport.bind(getClient().connection); - - getServer().connection = Proton.connection(); - getServer().transport.bind(getServer().connection); - - LOGGER.fine(bold("======== About to open connections")); - getClient().connection.open(); - getServer().connection.open(); - - doOutputInputCycle(); - - LOGGER.fine(bold("======== About to open sessions")); - getClient().session = getClient().connection.session(); - getClient().session.open(); - - pumpClientToServer(); - - getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); - - getServer().session.open(); - assertEndpointState(getServer().session, ACTIVE, ACTIVE); - - pumpServerToClient(); - assertEndpointState(getClient().session, ACTIVE, ACTIVE); - - LOGGER.fine(bold("======== About to create reciever")); - - getClient().source = new Source(); - getClient().source.setAddress(_sourceAddress); - - getClient().target = new Target(); - getClient().target.setAddress(null); - - getClient().receiver = getClient().session.receiver("link1"); - getClient().receiver.setTarget(getClient().target); - getClient().receiver.setSource(getClient().source); - - getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); - getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); - - assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); - - getClient().receiver.open(); - assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); - - pumpClientToServer(); - - LOGGER.fine(bold("======== About to set up implicitly created sender")); - - getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - - getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); - getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); - - org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource(); - getServer().sender.setSource(serverRemoteSource); - - assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); - getServer().sender.open(); - - assertEndpointState(getServer().sender, ACTIVE, ACTIVE); - - pumpServerToClient(); - - assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); - - int messagCount = 3; - getClient().receiver.flow(messagCount); - - pumpClientToServer(); - - LOGGER.fine(bold("======== About to create messages and send to the client")); - - Delivery serverDelivery1 = sendMessageToClient("delivery1", "Msg1"); - Delivery serverDelivery2 = sendMessageToClient("delivery2", "Msg2"); - Delivery serverDelivery3 = sendMessageToClient("delivery3", "Msg3"); - - pumpServerToClient(); - - assertNull(serverDelivery1.getLocalState()); - assertNull(serverDelivery2.getLocalState()); - assertNull(serverDelivery3.getLocalState()); - - assertNull(serverDelivery1.getRemoteState()); - assertNull(serverDelivery2.getRemoteState()); - assertNull(serverDelivery3.getRemoteState()); - - LOGGER.fine(bold("======== About to process the messages on the client")); - - Delivery clientDelivery1 = receiveMessageFromServer("delivery1", "Msg1"); - Delivery clientDelivery2 = receiveMessageFromServer("delivery2", "Msg2"); - Delivery clientDelivery3 = receiveMessageFromServer("delivery3", "Msg3"); - - // Give them some default state - clientDelivery1.setDefaultDeliveryState(Released.getInstance()); - clientDelivery2.setDefaultDeliveryState(Released.getInstance()); - clientDelivery3.setDefaultDeliveryState(Released.getInstance()); - - assertEquals(Released.getInstance(), clientDelivery1.getDefaultDeliveryState()); - assertEquals(Released.getInstance(), clientDelivery2.getDefaultDeliveryState()); - assertEquals(Released.getInstance(), clientDelivery3.getDefaultDeliveryState()); - - // Check the default state doesn't influence the actual state - assertNull(clientDelivery1.getLocalState()); - assertNull(clientDelivery2.getLocalState()); - assertNull(clientDelivery3.getLocalState()); - - assertNull(clientDelivery1.getRemoteState()); - assertNull(clientDelivery2.getRemoteState()); - assertNull(clientDelivery3.getRemoteState()); - - // Accept one for real, update default on another, leave last untouched - clientDelivery1.disposition(Accepted.getInstance()); - clientDelivery2.setDefaultDeliveryState(new Modified()); - - // Confirm default and actual states have or have not changed as expected - assertEquals(Released.getInstance(), clientDelivery1.getDefaultDeliveryState()); - assertTrue(clientDelivery2.getDefaultDeliveryState() instanceof Modified); - assertEquals(Released.getInstance(), clientDelivery3.getDefaultDeliveryState()); - - assertEquals(Accepted.getInstance(), clientDelivery1.getLocalState()); - assertNull(clientDelivery2.getLocalState()); - assertNull(clientDelivery3.getLocalState()); - - // Verify the server gets intended state changes - pumpClientToServer(); - - assertEquals(Accepted.getInstance(), serverDelivery1.getRemoteState()); - assertNull(serverDelivery2.getRemoteState()); - assertNull(serverDelivery3.getRemoteState()); - - // Confirm server sees the default states for second and third - // messages when they get settled during link free - getClient().receiver.close(); - assertEndpointState(getClient().receiver, CLOSED, ACTIVE); - - pumpClientToServer(); - - assertEndpointState(getServer().sender, ACTIVE, CLOSED); - getServer().sender.close(); - - assertEndpointState(getServer().sender, CLOSED, CLOSED); - - pumpServerToClient(); - - getClient().receiver.free(); - - assertEndpointState(getClient().receiver, CLOSED, CLOSED); - - pumpClientToServer(); - - assertEquals(Accepted.getInstance(), serverDelivery1.getRemoteState()); - assertTrue(serverDelivery2.getRemoteState() instanceof Modified); - assertEquals(Released.getInstance(), serverDelivery3.getRemoteState()); - } - - private Delivery receiveMessageFromServer(String deliveryTag, String messageContent) - { - Delivery delivery = getClient().connection.getWorkHead(); - - assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag())); - assertEquals("The received delivery should be on our receiver", - getClient().receiver, delivery.getLink()); - - assertNull(delivery.getLocalState()); - assertNull(delivery.getRemoteState()); - - assertFalse(delivery.isPartial()); - assertTrue(delivery.isReadable()); - - byte[] received = new byte[BUFFER_SIZE]; - int len = getClient().receiver.recv(received, 0, BUFFER_SIZE); - - assertTrue("given array was too small", len < BUFFER_SIZE); - - Message m = Proton.message(); - m.decode(received, 0, len); - - Object messageBody = ((AmqpValue)m.getBody()).getValue(); - assertEquals("Unexpected message content", messageContent, messageBody); - - boolean receiverAdvanced = getClient().receiver.advance(); - assertTrue("receiver has not advanced", receiverAdvanced); - - return delivery; - } - - private Delivery sendMessageToClient(String deliveryTag, String messageBody) - { - byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8); - - Message m = Proton.message(); - m.setBody(new AmqpValue(messageBody)); - - byte[] encoded = new byte[BUFFER_SIZE]; - int len = m.encode(encoded, 0, BUFFER_SIZE); - - assertTrue("given array was too small", len < BUFFER_SIZE); - - Delivery serverDelivery = getServer().sender.delivery(tag); - int sent = getServer().sender.send(encoded, 0, len); - - assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent); - - boolean senderAdvanced = getServer().sender.advance(); - assertTrue("sender has not advanced", senderAdvanced); - - return serverDelivery; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java deleted file mode 100644 index 69db7c1..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DeliveryTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import static java.util.EnumSet.of; -import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; -import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; -import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.logging.Logger; - -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.message.Message; -import org.junit.Test; - -public class DeliveryTest extends EngineTestBase -{ - private static final Logger LOGGER = Logger.getLogger(DeliveryTest.class.getName()); - - private static final int BUFFER_SIZE = 4096; - - private final String _sourceAddress = getServer().containerId + "-link1-source"; - - @Test - public void testMessageFormat() throws Exception - { - LOGGER.fine(bold("======== About to create transports")); - - getClient().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); - - getServer().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); - - doOutputInputCycle(); - - getClient().connection = Proton.connection(); - getClient().transport.bind(getClient().connection); - - getServer().connection = Proton.connection(); - getServer().transport.bind(getServer().connection); - - - LOGGER.fine(bold("======== About to open connections")); - getClient().connection.open(); - getServer().connection.open(); - - doOutputInputCycle(); - - - LOGGER.fine(bold("======== About to open sessions")); - getClient().session = getClient().connection.session(); - getClient().session.open(); - - pumpClientToServer(); - - getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); - - getServer().session.open(); - assertEndpointState(getServer().session, ACTIVE, ACTIVE); - - pumpServerToClient(); - assertEndpointState(getClient().session, ACTIVE, ACTIVE); - - - LOGGER.fine(bold("======== About to create reciever")); - - getClient().source = new Source(); - getClient().source.setAddress(_sourceAddress); - - getClient().target = new Target(); - getClient().target.setAddress(null); - - getClient().receiver = getClient().session.receiver("link1"); - getClient().receiver.setTarget(getClient().target); - getClient().receiver.setSource(getClient().source); - - getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); - getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); - - assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); - - getClient().receiver.open(); - assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); - - pumpClientToServer(); - - - LOGGER.fine(bold("======== About to set up implicitly created sender")); - - getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - - getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); - getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); - - org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource(); - getServer().sender.setSource(serverRemoteSource); - - assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); - getServer().sender.open(); - - assertEndpointState(getServer().sender, ACTIVE, ACTIVE); - - pumpServerToClient(); - - assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); - - int messagCount = 4; - getClient().receiver.flow(messagCount); - - pumpClientToServer(); - - - LOGGER.fine(bold("======== About to create messages and send to the client")); - - sendMessageToClient("delivery1", "Msg1", null); // Don't set it, so it should be defaulted - sendMessageToClient("delivery2", "Msg2", 0); // Explicitly set it to the default - sendMessageToClient("delivery3", "Msg3", 1); - sendMessageToClient("delivery4", "Msg4", UnsignedInteger.MAX_VALUE.intValue()); // Limit - - pumpServerToClient(); - - LOGGER.fine(bold("======== About to process the messages on the client")); - - Delivery clientDelivery1 = receiveMessageFromServer("delivery1", "Msg1"); - Delivery clientDelivery2 = receiveMessageFromServer("delivery2", "Msg2"); - Delivery clientDelivery3 = receiveMessageFromServer("delivery3", "Msg3"); - Delivery clientDelivery4 = receiveMessageFromServer("delivery4", "Msg4"); - - // Verify the message format is as expected - assertEquals("Unexpected message format", 0, clientDelivery1.getMessageFormat()); - assertEquals("Unexpected message format", 0, clientDelivery2.getMessageFormat()); - assertEquals("Unexpected message format", 1, clientDelivery3.getMessageFormat()); - assertEquals("Unexpected message format", UnsignedInteger.MAX_VALUE.intValue(), clientDelivery4.getMessageFormat()); - } - - private Delivery receiveMessageFromServer(String deliveryTag, String messageContent) - { - Delivery delivery = getClient().connection.getWorkHead(); - - assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag())); - assertEquals("The received delivery should be on our receiver", - getClient().receiver, delivery.getLink()); - - assertNull(delivery.getLocalState()); - assertNull(delivery.getRemoteState()); - - assertFalse(delivery.isPartial()); - assertTrue(delivery.isReadable()); - - byte[] received = new byte[BUFFER_SIZE]; - int len = getClient().receiver.recv(received, 0, BUFFER_SIZE); - - assertTrue("given array was too small", len < BUFFER_SIZE); - - Message m = Proton.message(); - m.decode(received, 0, len); - - Object messageBody = ((AmqpValue)m.getBody()).getValue(); - assertEquals("Unexpected message content", messageContent, messageBody); - - boolean receiverAdvanced = getClient().receiver.advance(); - assertTrue("receiver has not advanced", receiverAdvanced); - - return delivery; - } - - private Delivery sendMessageToClient(String deliveryTag, String messageContent, Integer messageFormat) - { - byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8); - - Message m = Proton.message(); - m.setBody(new AmqpValue(messageContent)); - - byte[] encoded = new byte[BUFFER_SIZE]; - int len = m.encode(encoded, 0, BUFFER_SIZE); - - assertTrue("given array was too small", len < BUFFER_SIZE); - - Delivery serverDelivery = getServer().sender.delivery(tag); - - // Verify the default format of 0 is in place - assertEquals("Unexpected message format", 0, serverDelivery.getMessageFormat()); - - // Set the message format explicitly if given, or leave it at the default - if(messageFormat != null) { - serverDelivery.setMessageFormat(messageFormat); - } - - int sent = getServer().sender.send(encoded, 0, len); - - assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent); - - boolean senderAdvanced = getServer().sender.advance(); - assertTrue("sender has not advanced", senderAdvanced); - - return serverDelivery; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java deleted file mode 100644 index 9dadf29..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.nio.ByteBuffer; -import java.util.logging.Logger; - -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.engine.Endpoint; -import org.apache.qpid.proton.engine.EndpointState; - -public abstract class EngineTestBase -{ - private static final Logger LOGGER = Logger.getLogger(EngineTestBase.class.getName()); - - private final TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER); - private final ProtonContainer _client = new ProtonContainer("clientContainer"); - private final ProtonContainer _server = new ProtonContainer("serverContainer"); - - protected TestLoggingHelper getTestLoggingHelper() - { - return _testLoggingHelper; - } - - protected ProtonContainer getClient() - { - return _client; - } - - protected ProtonContainer getServer() - { - return _server; - } - - protected void assertClientHasNothingToOutput() - { - assertEquals(0, getClient().transport.getOutputBuffer().remaining()); - getClient().transport.outputConsumed(); - } - - protected void pumpServerToClient() - { - ByteBuffer serverBuffer = getServer().transport.getOutputBuffer(); - - getTestLoggingHelper().prettyPrint(" <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer); - assertTrue("Server expected to produce some output", serverBuffer.hasRemaining()); - - ByteBuffer clientBuffer = getClient().transport.getInputBuffer(); - - clientBuffer.put(serverBuffer); - - assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining()); - - getClient().transport.processInput().checkIsOk(); - getServer().transport.outputConsumed(); - } - - protected void pumpClientToServer() - { - ByteBuffer clientBuffer = getClient().transport.getOutputBuffer(); - - getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer); - assertTrue("Client expected to produce some output", clientBuffer.hasRemaining()); - - ByteBuffer serverBuffer = getServer().transport.getInputBuffer(); - - serverBuffer.put(clientBuffer); - - assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining()); - - getClient().transport.outputConsumed(); - getServer().transport.processInput().checkIsOk(); - } - - protected void doOutputInputCycle() throws Exception - { - pumpClientToServer(); - - pumpServerToClient(); - } - - protected void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState) - { - assertEquals(localState, endpoint.getLocalState()); - assertEquals(remoteState, endpoint.getRemoteState()); - } - - protected void assertTerminusEquals(org.apache.qpid.proton.amqp.transport.Target expectedTarget, org.apache.qpid.proton.amqp.transport.Target actualTarget) - { - assertEquals( - ((Target)expectedTarget).getAddress(), - ((Target)actualTarget).getAddress()); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java deleted file mode 100644 index 7687206..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import static java.util.EnumSet.of; -import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; -import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; -import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; - -import java.util.logging.Logger; - -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; -import org.junit.Test; - -public class FreeTest extends EngineTestBase -{ - private static final Logger LOGGER = Logger.getLogger(FreeTest.class.getName()); - - @Test - public void testFreeConnectionWithMultipleSessionsAndSendersAndReceiversDoesNotThrowCME() throws Exception - { - LOGGER.fine(bold("======== About to create transports")); - - getClient().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); - - getServer().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); - - getClient().connection = Proton.connection(); - getClient().transport.bind(getClient().connection); - - getServer().connection = Proton.connection(); - getServer().transport.bind(getServer().connection); - - - - LOGGER.fine(bold("======== About to open connections")); - getClient().connection.open(); - getServer().connection.open(); - - doOutputInputCycle(); - - - - LOGGER.fine(bold("======== About to open sessions")); - getClient().session = getClient().connection.session(); - getClient().session.open(); - - Session clientSession2 = getClient().connection.session(); - clientSession2.open(); - - pumpClientToServer(); - - getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); - - getServer().session.open(); - assertEndpointState(getServer().session, ACTIVE, ACTIVE); - - Session serverSession2 = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertNotNull("Engine did not return expected second server session", serverSession2); - assertNotSame("Engine did not return expected second server session", serverSession2, getServer().session); - serverSession2.open(); - - pumpServerToClient(); - assertEndpointState(getClient().session, ACTIVE, ACTIVE); - assertEndpointState(clientSession2, ACTIVE, ACTIVE); - - - - LOGGER.fine(bold("======== About to create client senders")); - - getClient().source = new Source(); - getClient().source.setAddress(null); - - getClient().target = new Target(); - getClient().target.setAddress("myQueue"); - - getClient().sender = getClient().session.sender("sender1"); - getClient().sender.setTarget(getClient().target); - getClient().sender.setSource(getClient().source); - - getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); - getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); - - assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); - - getClient().sender.open(); - assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); - - - Sender clientSender2 = getClient().session.sender("sender2"); - clientSender2.setTarget(getClient().target); - clientSender2.setSource(getClient().source); - - clientSender2.setSenderSettleMode(SenderSettleMode.UNSETTLED); - clientSender2.setReceiverSettleMode(ReceiverSettleMode.FIRST); - - assertEndpointState(clientSender2, UNINITIALIZED, UNINITIALIZED); - - clientSender2.open(); - assertEndpointState(clientSender2, ACTIVE, UNINITIALIZED); - - pumpClientToServer(); - - - LOGGER.fine(bold("======== About to set up server receivers")); - - getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - // Accept the settlement modes suggested by the client - getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); - getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); - - org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); - assertTerminusEquals(getClient().target, serverRemoteTarget); - - getServer().receiver.setTarget(serverRemoteTarget); - - assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); - getServer().receiver.open(); - - assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); - - Receiver serverReceiver2 = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - serverReceiver2.open(); - assertEndpointState(serverReceiver2, ACTIVE, ACTIVE); - - pumpServerToClient(); - assertEndpointState(getClient().sender, ACTIVE, ACTIVE); - assertEndpointState(clientSender2, ACTIVE, ACTIVE); - - - - LOGGER.fine(bold("======== About to create client receivers")); - - Source src = new Source(); - src.setAddress("myQueue"); - - Target tgt1 = new Target(); - tgt1.setAddress("receiver1"); - - getClient().receiver = getClient().session.receiver("receiver1"); - getClient().receiver.setSource(src); - getClient().receiver.setTarget(tgt1); - - getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); - getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); - - assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); - - getClient().receiver.open(); - assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); - - - Target tgt2 = new Target(); - tgt1.setAddress("receiver2"); - - Receiver clientReceiver2 = getClient().session.receiver("receiver2"); - clientReceiver2.setSource(src); - clientReceiver2.setTarget(tgt2); - - clientReceiver2.setSenderSettleMode(SenderSettleMode.UNSETTLED); - clientReceiver2.setReceiverSettleMode(ReceiverSettleMode.FIRST); - - assertEndpointState(clientReceiver2, UNINITIALIZED, UNINITIALIZED); - - clientReceiver2.open(); - assertEndpointState(clientReceiver2, ACTIVE, UNINITIALIZED); - - pumpClientToServer(); - - - - LOGGER.fine(bold("======== About to set up server senders")); - - getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - // Accept the settlement modes suggested by the client - getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); - getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); - - org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget2 = getServer().sender.getRemoteTarget(); - assertTerminusEquals(tgt1, serverRemoteTarget2); - - getServer().sender.setTarget(serverRemoteTarget2); - - assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); - getServer().sender.open(); - assertEndpointState(getServer().sender, ACTIVE, ACTIVE); - - Sender serverSender2 = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - - serverRemoteTarget2 = serverSender2.getRemoteTarget(); - assertTerminusEquals(tgt2, serverRemoteTarget2); - serverSender2.setTarget(serverRemoteTarget2); - serverSender2.open(); - assertEndpointState(serverSender2, ACTIVE, ACTIVE); - - pumpServerToClient(); - assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); - assertEndpointState(clientReceiver2, ACTIVE, ACTIVE); - - - - LOGGER.fine(bold("======== About to close and free client's connection")); - - getClient().connection.close(); - getClient().connection.free(); - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java deleted file mode 100644 index 518960c..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java +++ /dev/null @@ -1,494 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import static java.util.EnumSet.of; -import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; -import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; -import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.logging.Logger; - -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.message.Message; -import org.junit.Test; - -public class LinkTest extends EngineTestBase -{ - private static final Logger LOGGER = Logger.getLogger(LinkTest.class.getName()); - - private static final int BUFFER_SIZE = 4096; - - private static final Symbol RCV_PROP = Symbol.valueOf("ReceiverPropName"); - private static final Integer RCV_PROP_VAL = 1234; - private static final Symbol SND_PROP = Symbol.valueOf("SenderPropName"); - private static final Integer SND_PROP_VAL = 5678; - - private final String _sourceAddress = getServer().containerId + "-link1-source"; - - @Test - public void testCapabilities() throws Exception - { - final Symbol recvOfferedCap = Symbol.valueOf("recvOfferedCapability"); - final Symbol recvDesiredCap = Symbol.valueOf("recvDesiredCapability"); - final Symbol senderOfferedCap = Symbol.valueOf("senderOfferedCapability"); - final Symbol senderDesiredCap = Symbol.valueOf("senderDesiredCapability"); - - Symbol[] clientOfferedCapabilities = new Symbol[] { recvOfferedCap }; - Symbol[] clientDesiredCapabilities = new Symbol[] { recvDesiredCap }; - - Symbol[] serverOfferedCapabilities = new Symbol[] { senderOfferedCap }; - Symbol[] serverDesiredCapabilities = new Symbol[] { senderDesiredCap }; - - LOGGER.fine(bold("======== About to create transports")); - - getClient().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); - - getServer().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); - - doOutputInputCycle(); - - getClient().connection = Proton.connection(); - getClient().transport.bind(getClient().connection); - - getServer().connection = Proton.connection(); - getServer().transport.bind(getServer().connection); - - LOGGER.fine(bold("======== About to open connections")); - getClient().connection.open(); - getServer().connection.open(); - - doOutputInputCycle(); - - LOGGER.fine(bold("======== About to open sessions")); - getClient().session = getClient().connection.session(); - getClient().session.open(); - - pumpClientToServer(); - - getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); - - getServer().session.open(); - assertEndpointState(getServer().session, ACTIVE, ACTIVE); - - pumpServerToClient(); - assertEndpointState(getClient().session, ACTIVE, ACTIVE); - - LOGGER.fine(bold("======== About to create reciever")); - - getClient().source = new Source(); - getClient().source.setAddress(_sourceAddress); - - getClient().target = new Target(); - getClient().target.setAddress(null); - - getClient().receiver = getClient().session.receiver("link1"); - getClient().receiver.setTarget(getClient().target); - getClient().receiver.setSource(getClient().source); - - getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); - getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); - - // Set the client receivers capabilities - getClient().receiver.setOfferedCapabilities(clientOfferedCapabilities); - getClient().receiver.setDesiredCapabilities(clientDesiredCapabilities); - - assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); - - getClient().receiver.open(); - assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); - - pumpClientToServer(); - - LOGGER.fine(bold("======== About to set up implicitly created sender")); - - getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - - getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); - getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); - - org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource(); - getServer().sender.setSource(serverRemoteSource); - - // Set the server senders capabilities - getServer().sender.setOfferedCapabilities(serverOfferedCapabilities); - getServer().sender.setDesiredCapabilities(serverDesiredCapabilities); - - assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); - getServer().sender.open(); - - assertEndpointState(getServer().sender, ACTIVE, ACTIVE); - - pumpServerToClient(); - - assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); - - // Verify server side got the clients receiver capabilities as expected - Symbol[] serverRemoteOfferedCapabilities = getServer().sender.getRemoteOfferedCapabilities(); - assertNotNull("Server had no remote offered capabilities", serverRemoteOfferedCapabilities); - assertEquals("Server remote offered capabilities not expected size", 1, serverRemoteOfferedCapabilities.length); - assertTrue("Server remote offered capabilities lack expected value: " + recvOfferedCap, Arrays.asList(serverRemoteOfferedCapabilities).contains(recvOfferedCap)); - - Symbol[] serverRemoteDesiredCapabilities = getServer().sender.getRemoteDesiredCapabilities(); - assertNotNull("Server had no remote desired capabilities", serverRemoteDesiredCapabilities); - assertEquals("Server remote desired capabilities not expected size", 1, serverRemoteDesiredCapabilities.length); - assertTrue("Server remote desired capabilities lack expected value: " + recvDesiredCap, Arrays.asList(serverRemoteDesiredCapabilities).contains(recvDesiredCap)); - - // Verify the client side got the servers sender capabilities as expected - Symbol[] clientRemoteOfferedCapabilities = getClient().receiver.getRemoteOfferedCapabilities(); - assertNotNull("Client had no remote offered capabilities", clientRemoteOfferedCapabilities); - assertEquals("Client remote offered capabilities not expected size", 1, clientRemoteOfferedCapabilities.length); - assertTrue("Client remote offered capabilities lack expected value: " + senderOfferedCap, Arrays.asList(clientRemoteOfferedCapabilities).contains(senderOfferedCap)); - - Symbol[] clientRemoteDesiredCapabilities = getClient().receiver.getRemoteDesiredCapabilities(); - assertNotNull("Client had no remote desired capabilities", clientRemoteDesiredCapabilities); - assertEquals("Client remote desired capabilities not expected size", 1, clientRemoteDesiredCapabilities.length); - assertTrue("Client remote desired capabilities lack expected value: " + senderDesiredCap, Arrays.asList(clientRemoteDesiredCapabilities).contains(senderDesiredCap)); - } - - @Test - public void testProperties() throws Exception - { - Map<Symbol, Object> receiverProps = new HashMap<>(); - receiverProps.put(RCV_PROP, RCV_PROP_VAL); - - Map<Symbol, Object> senderProps = new HashMap<>(); - senderProps.put(SND_PROP, SND_PROP_VAL); - - LOGGER.fine(bold("======== About to create transports")); - - getClient().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); - - getServer().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); - - doOutputInputCycle(); - - getClient().connection = Proton.connection(); - getClient().transport.bind(getClient().connection); - - getServer().connection = Proton.connection(); - getServer().transport.bind(getServer().connection); - - LOGGER.fine(bold("======== About to open connections")); - getClient().connection.open(); - getServer().connection.open(); - - doOutputInputCycle(); - - LOGGER.fine(bold("======== About to open sessions")); - getClient().session = getClient().connection.session(); - getClient().session.open(); - - pumpClientToServer(); - - getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); - - getServer().session.open(); - assertEndpointState(getServer().session, ACTIVE, ACTIVE); - - pumpServerToClient(); - assertEndpointState(getClient().session, ACTIVE, ACTIVE); - - LOGGER.fine(bold("======== About to create reciever")); - - getClient().source = new Source(); - getClient().source.setAddress(_sourceAddress); - - getClient().target = new Target(); - getClient().target.setAddress(null); - - getClient().receiver = getClient().session.receiver("link1"); - getClient().receiver.setTarget(getClient().target); - getClient().receiver.setSource(getClient().source); - - getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); - getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); - - // Set the recievers properties - getClient().receiver.setProperties(receiverProps); - - assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); - - getClient().receiver.open(); - assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); - - pumpClientToServer(); - - LOGGER.fine(bold("======== About to set up implicitly created sender")); - - getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - - getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); - getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); - - org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource(); - getServer().sender.setSource(serverRemoteSource); - - // Set the senders properties - getServer().sender.setProperties(senderProps); - - assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); - getServer().sender.open(); - - assertEndpointState(getServer().sender, ACTIVE, ACTIVE); - - pumpServerToClient(); - - assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); - - // Verify server side got the clients receiver properties as expected - Map<Symbol, Object> serverRemoteProperties = getServer().sender.getRemoteProperties(); - assertNotNull("Server had no remote properties", serverRemoteProperties); - assertEquals("Server remote properties not expected size", 1, serverRemoteProperties.size()); - assertTrue("Server remote properties lack expected key: " + RCV_PROP, serverRemoteProperties.containsKey(RCV_PROP)); - assertEquals("Server remote properties contain unexpected value for key: " + RCV_PROP, RCV_PROP_VAL, serverRemoteProperties.get(RCV_PROP)); - - // Verify the client side got the servers sender properties as expected - Map<Symbol, Object> clientRemoteProperties = getClient().receiver.getRemoteProperties(); - assertNotNull("Client had no remote properties", clientRemoteProperties); - assertEquals("Client remote properties not expected size", 1, clientRemoteProperties.size()); - assertTrue("Client remote properties lack expected key: " + SND_PROP, clientRemoteProperties.containsKey(SND_PROP)); - assertEquals("Client remote properties contain unexpected value for key: " + SND_PROP, SND_PROP_VAL, clientRemoteProperties.get(SND_PROP)); - } - - /** - * Test the {@link Link#getCredit()}, {@link Link#getQueued()}, and - * {@link Link#getRemoteCredit()} methods behave as expected when sending - * from server and receiving on client links. - * - * @throws Exception - * if something unexpected occurs - */ - @Test - public void testLinkCreditState() throws Exception - { - LOGGER.fine(bold("======== About to create transports")); - - getClient().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); - - getServer().transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); - - doOutputInputCycle(); - - getClient().connection = Proton.connection(); - getClient().transport.bind(getClient().connection); - - getServer().connection = Proton.connection(); - getServer().transport.bind(getServer().connection); - - LOGGER.fine(bold("======== About to open connections")); - getClient().connection.open(); - getServer().connection.open(); - - doOutputInputCycle(); - - LOGGER.fine(bold("======== About to open sessions")); - getClient().session = getClient().connection.session(); - getClient().session.open(); - - pumpClientToServer(); - - getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); - - getServer().session.open(); - assertEndpointState(getServer().session, ACTIVE, ACTIVE); - - pumpServerToClient(); - assertEndpointState(getClient().session, ACTIVE, ACTIVE); - - LOGGER.fine(bold("======== About to create reciever")); - - getClient().source = new Source(); - getClient().source.setAddress(_sourceAddress); - - getClient().target = new Target(); - getClient().target.setAddress(null); - - getClient().receiver = getClient().session.receiver("link1"); - getClient().receiver.setTarget(getClient().target); - getClient().receiver.setSource(getClient().source); - - getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); - getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); - - assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); - - getClient().receiver.open(); - assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); - - pumpClientToServer(); - - LOGGER.fine(bold("======== About to set up implicitly created sender")); - - getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); - - getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); - getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); - - org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource(); - getServer().sender.setSource(serverRemoteSource); - - assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); - getServer().sender.open(); - - assertEndpointState(getServer().sender, ACTIVE, ACTIVE); - - pumpServerToClient(); - - assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); - - LOGGER.fine(bold("======== About to flow credit")); - - // Verify credit state - assertLinkCreditState(getServer().sender, 0, 0, 0); - assertLinkCreditState(getClient().receiver, 0, 0, 0); - - int messagCount = 4; - getClient().receiver.flow(messagCount); - - // Verify credit state - assertLinkCreditState(getServer().sender, 0, 0, 0); - assertLinkCreditState(getClient().receiver, 4, 0, 4); - - pumpClientToServer(); - - // Verify credit state - assertLinkCreditState(getServer().sender, 4, 0, 4); - assertLinkCreditState(getClient().receiver, 4, 0, 4); - - LOGGER.fine(bold("======== About to create messages and send to the client")); - - // 'Send' and verify credit state - sendMessageToClient("delivery1", "Msg1"); - assertLinkCreditState(getServer().sender, 3, 1, 3); - assertLinkCreditState(getClient().receiver, 4, 0, 4); - - // 'Send' and verify credit state - sendMessageToClient("delivery2", "Msg2"); - assertLinkCreditState(getServer().sender, 2, 2, 2); - assertLinkCreditState(getClient().receiver, 4, 0, 4); - - // Pump to the client to send messages 'on the wire', verify new state, process messages - pumpServerToClient(); - - LOGGER.fine(bold("======== About to process the messages on the client")); - - assertLinkCreditState(getServer().sender, 2, 0, 2); - assertLinkCreditState(getClient().receiver, 4, 2, 2); - - receiveMessageFromServer("delivery1", "Msg1"); - - assertLinkCreditState(getServer().sender, 2, 0, 2); - assertLinkCreditState(getClient().receiver, 3, 1, 2); - - receiveMessageFromServer("delivery2", "Msg2"); - - assertLinkCreditState(getServer().sender, 2, 0, 2); - assertLinkCreditState(getClient().receiver, 2, 0, 2); - } - - void assertLinkCreditState(Link link, int credit, int queued, int remoteCredit) - { - assertEquals("Unexpected credit", credit, link.getCredit()); - assertEquals("Unexpected queued", queued, link.getQueued()); - assertEquals("Unexpected remote credit", remoteCredit, link.getRemoteCredit()); - } - - private Delivery receiveMessageFromServer(String deliveryTag, String messageContent) - { - Delivery delivery = getClient().connection.getWorkHead(); - - assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag())); - assertEquals("The received delivery should be on our receiver", - getClient().receiver, delivery.getLink()); - - assertNull(delivery.getLocalState()); - assertNull(delivery.getRemoteState()); - - assertFalse(delivery.isPartial()); - assertTrue(delivery.isReadable()); - - byte[] received = new byte[BUFFER_SIZE]; - int len = getClient().receiver.recv(received, 0, BUFFER_SIZE); - - assertTrue("given array was too small", len < BUFFER_SIZE); - - Message m = Proton.message(); - m.decode(received, 0, len); - - Object messageBody = ((AmqpValue)m.getBody()).getValue(); - assertEquals("Unexpected message content", messageContent, messageBody); - - boolean receiverAdvanced = getClient().receiver.advance(); - assertTrue("receiver has not advanced", receiverAdvanced); - - return delivery; - } - - private Delivery sendMessageToClient(String deliveryTag, String messageContent) - { - byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8); - - Message m = Proton.message(); - m.setBody(new AmqpValue(messageContent)); - - byte[] encoded = new byte[BUFFER_SIZE]; - int len = m.encode(encoded, 0, BUFFER_SIZE); - - assertTrue("given array was too small", len < BUFFER_SIZE); - - Delivery serverDelivery = getServer().sender.delivery(tag); - - int sent = getServer().sender.send(encoded, 0, len); - - assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent); - - boolean senderAdvanced = getServer().sender.advance(); - assertTrue("sender has not advanced", senderAdvanced); - - return serverDelivery; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtocolTracerEnabler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtocolTracerEnabler.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtocolTracerEnabler.java deleted file mode 100644 index a26afa8..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtocolTracerEnabler.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; -import java.util.logging.Logger; - -import org.apache.qpid.proton.engine.Transport; - -public class ProtocolTracerEnabler -{ - private static final Logger LOGGER = Logger.getLogger(ProtocolTracerEnabler.class.getName()); - - private static final String LOGGING_PROTOCOL_TRACER_CLASS_NAME = "org.apache.qpid.proton.logging.LoggingProtocolTracer"; - - /** - * Attempts to set up a {@value #LOGGING_PROTOCOL_TRACER_CLASS_NAME} on the supplied transport. - * Uses reflection so this code can be run without a compile-time dependency on proton-j-impl. - */ - public static void setProtocolTracer(Transport transport, String prefix) - { - try - { - Class<?> loggingProtocolTracerClass = Class.forName(LOGGING_PROTOCOL_TRACER_CLASS_NAME); - - Constructor<?> loggingProtocolTracerConstructor = loggingProtocolTracerClass.getConstructor(String.class); - Object newLoggingProtocolTracer = loggingProtocolTracerConstructor.newInstance(prefix); - - Class<?> protocolTracerClass = Class.forName("org.apache.qpid.proton.engine.impl.ProtocolTracer"); - Method setPrococolTracerMethod = transport.getClass().getMethod("setProtocolTracer", protocolTracerClass); - - setPrococolTracerMethod.invoke(transport, newLoggingProtocolTracer); - } - catch(Exception e) - { - if(e instanceof ClassNotFoundException || e instanceof NoSuchMethodException) - { - LOGGER.fine("Protocol tracing disabled because unable to reflectively set a " - + LOGGING_PROTOCOL_TRACER_CLASS_NAME + " instance on the supplied transport which is a: " - + transport.getClass().getName()); - } - else - { - throw new RuntimeException("Unable to set up protocol tracing", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonContainer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonContainer.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonContainer.java deleted file mode 100644 index d24d1ce..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonContainer.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.message.Message; - -/** - * Simple "struct" class used to hold related Engine objects - */ -public class ProtonContainer -{ - String containerId; - Connection connection; - Transport transport; - Session session; - Sender sender; - Receiver receiver; - Source source; - Target target; - Delivery delivery; - Message message; - byte[] messageData; - - public ProtonContainer(String containerId) - { - this.containerId = containerId; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org