Repository: qpid-jms Updated Branches: refs/heads/master 8e3f1bd50 -> b95ac58df
Adds some basic work on a replacement transport which uses Netty directly instead of Vert.x also adds some tests Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b95ac58d Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b95ac58d Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b95ac58d Branch: refs/heads/master Commit: b95ac58df115c4ba0ce57f2e9585de5571756845 Parents: 8e3f1bd Author: Timothy Bish <tabish...@gmail.com> Authored: Wed Jan 7 13:50:36 2015 -0500 Committer: Timothy Bish <tabish...@gmail.com> Committed: Wed Jan 7 13:50:36 2015 -0500 ---------------------------------------------------------------------- .../qpid/jms/transports/NettyTcpTransport.java | 204 +++++++++++++++++++ .../jms/transports/TcpTransportOptions.java | 153 ++++++++++++++ .../java/org/apache/qpid/jms/test/Wait.java | 48 +++++ .../qpid/jms/test/netty/NettyEchoServer.java | 132 ++++++++++++ .../jms/test/netty/NettyTcpTransportTest.java | 185 +++++++++++++++++ 5 files changed, 722 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java new file mode 100644 index 0000000..ebd385d --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.ReferenceCountUtil; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vertx.java.core.buffer.Buffer; +import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator; + +/** + * TCP based transport that uses Netty as the underlying IO layer. + */ +public class NettyTcpTransport implements Transport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); + + private Bootstrap bootstrap; + private EventLoopGroup group; + private Channel channel; + private TransportListener listener; + private final TcpTransportOptions options; + private final URI remote; + + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + + /** + * Create a new transport instance + * + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTcpTransport(TransportListener listener, URI remoteLocation, TcpTransportOptions options) { + this.options = options; + this.listener = listener; + this.remote = remoteLocation; + } + + @Override + public void connect() throws IOException { + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + group = new NioEventLoopGroup(); + + bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer<Channel>() { + + @Override + public void initChannel(Channel connectedChannel) throws Exception { + channel = connectedChannel; + channel.pipeline().addLast(new NettyTcpTransportHandler()); + } + }); + + configureNetty(bootstrap, options); + + ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort()); + future.awaitUninterruptibly(); + + if (future.isCancelled()) { + throw new IOException("Connection attempt was cancelled"); + } else if (!future.isSuccess()) { + throw IOExceptionSupport.create(future.cause()); + } else { + connected.set(true); + } + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + channel.close(); + group.shutdownGracefully(); + } + } + + @Override + public void send(ByteBuffer output) throws IOException { + send(Unpooled.wrappedBuffer(output)); + } + + @Override + public void send(ByteBuf output) throws IOException { + channel.write(output); + channel.flush(); + } + + @Override + public TransportListener getTransportListener() { + return listener; + } + + @Override + public void setTransportListener(TransportListener listener) { + this.listener = listener; + } + + //----- Internal implementation details ----------------------------------// + + protected void configureNetty(Bootstrap bootstrap, TcpTransportOptions options) { + bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); + bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + + if (options.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); + } + + if (options.getReceiveBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize())); + } + + if (options.getTrafficClass() != -1) { + bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); + } + } + + //----- Handle connection events -----------------------------------------// + + private class NettyTcpTransportHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + LOG.info("Channel has become active! Channel is {}", context.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext context, Object inbound) throws Exception { + ByteBuf buffer = (ByteBuf) inbound; + LOG.info("New data read: {} bytes incoming", buffer.readableBytes()); + try { + listener.onData(new Buffer(buffer)); + } finally { + ReferenceCountUtil.release(inbound); + } + } + + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + LOG.info("Channel has gone inactive! Channel is {}", context.channel()); + if (!closed.get()) { + connected.set(false); + listener.onTransportClosed(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { + LOG.info("Exception on channel! Channel is {}", context.channel()); + if (!closed.get()) { + connected.set(false); + listener.onTransportError(cause); + } + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java new file mode 100644 index 0000000..e5f90c3 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +/** + * Encapsulates all the TCP Transport options in one configuration object. + */ +public class TcpTransportOptions { + + public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024; + public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE; + public static final int DEFAULT_TRAFFIC_CLASS = 0; + public static final boolean DEFAULT_TCP_NO_DELAY = true; + public static final boolean DEFAULT_TCP_KEEP_ALIVE = false; + public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE; + public static final int DEFAULT_SO_TIMEOUT = -1; + public static final int DEFAULT_CONNECT_TIMEOUT = 60000; + + private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; + private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; + private int trafficClass = DEFAULT_TRAFFIC_CLASS; + private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int soTimeout = DEFAULT_SO_TIMEOUT; + private int soLinger = DEFAULT_SO_LINGER; + private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE; + private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; + + /** + * @return the currently set send buffer size in bytes. + */ + public int getSendBufferSize() { + return sendBufferSize; + } + + /** + * Sets the send buffer size in bytes, the value must be greater than zero + * or an {@link IllegalArgumentException} will be thrown. + * + * @param sendBufferSize + * the new send buffer size for the TCP Transport. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setSendBufferSize(int sendBufferSize) { + if (sendBufferSize <= 0) { + throw new IllegalArgumentException("The send buffer size must be > 0"); + } + + this.sendBufferSize = sendBufferSize; + } + + /** + * @return the currently configured receive buffer size in bytes. + */ + public int getReceiveBufferSize() { + return receiveBufferSize; + } + + /** + * Sets the receive buffer size in bytes, the value must be greater than zero + * or an {@link IllegalArgumentException} will be thrown. + * + * @param receiveBufferSize + * the new receive buffer size for the TCP Transport. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setReceiveBufferSize(int receiveBufferSize) { + if (receiveBufferSize <= 0) { + throw new IllegalArgumentException("The send buffer size must be > 0"); + } + + this.receiveBufferSize = receiveBufferSize; + } + + /** + * @return the currently configured traffic class value. + */ + public int getTrafficClass() { + return trafficClass; + } + + /** + * Sets the traffic class value used by the TCP connection, valid + * range is between 0 and 255. + * + * @param trafficClass + * the new traffic class value. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setTrafficClass(int trafficClass) { + if (trafficClass < 0 || trafficClass > 255) { + throw new IllegalArgumentException("Traffic class must be in the range [0..255]"); + } + + this.trafficClass = trafficClass; + } + + public int getSoTimeout() { + return soTimeout; + } + + public void setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public int getSoLinger() { + return soLinger; + } + + public void setSoLinger(int soLinger) { + this.soLinger = soLinger; + } + + public boolean isTcpKeepAlive() { + return tcpKeepAlive; + } + + public void setTcpKeepAlive(boolean keepAlive) { + this.tcpKeepAlive = keepAlive; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java new file mode 100644 index 0000000..e95a2a9 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.test; + +import java.util.concurrent.TimeUnit; + +public class Wait { + + public static final long MAX_WAIT_MILLIS = 30 * 1000; + public static final int SLEEP_MILLIS = 1000; + + public interface Condition { + boolean isSatisified() throws Exception; + } + + public static boolean waitFor(Condition condition) throws Exception { + return waitFor(condition, MAX_WAIT_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long duration) throws Exception { + return waitFor(condition, duration, SLEEP_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception { + + final long expiry = System.currentTimeMillis() + duration; + boolean conditionSatisified = condition.isSatisified(); + while (!conditionSatisified && System.currentTimeMillis() < expiry) { + TimeUnit.MILLISECONDS.sleep(sleepMillis); + conditionSatisified = condition.isSatisified(); + } + return conditionSatisified; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java new file mode 100644 index 0000000..a3f5bcd --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.test.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ServerSocketFactory; + +/** + * Simple Netty Server used to echo all data. + */ +public class NettyEchoServer implements AutoCloseable { + + static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Channel serverChannel; + private int serverPort; + + private final AtomicBoolean started = new AtomicBoolean(); + + public void start() throws Exception { + + if (started.compareAndSet(false, true)) { + + // Configure the server. + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + + ServerBootstrap server = new ServerBootstrap(); + server.group(bossGroup, workerGroup); + server.channel(NioServerSocketChannel.class); + server.option(ChannelOption.SO_BACKLOG, 100); + server.handler(new LoggingHandler(LogLevel.INFO)); + server.childHandler(new ChannelInitializer<Channel>() { + @Override + public void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new EchoServerHandler()); + } + }); + + // Start the server. + serverChannel = server.bind(getServerPort()).sync().channel(); + } + } + + public void stop() { + if (started.compareAndSet(true, false)) { + try { + serverChannel.close().sync(); + } catch (InterruptedException e) { + } + // Shut down all event loops to terminate all threads. + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + @Override + public void close() { + stop(); + } + + public int getServerPort() { + if (serverPort == 0) { + ServerSocket ss = null; + try { + ss = ServerSocketFactory.getDefault().createServerSocket(0); + serverPort = ss.getLocalPort(); + } catch (IOException e) { // revert back to default + serverPort = PORT; + } finally { + try { + if (ss != null ) { + ss.close(); + } + } catch (IOException e) { // ignore + } + } + } + return serverPort; + } + + private class EchoServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java new file mode 100644 index 0000000..2612c05 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.test.netty; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.Wait; +import org.apache.qpid.jms.transports.NettyTcpTransport; +import org.apache.qpid.jms.transports.TcpTransportOptions; +import org.apache.qpid.jms.transports.TransportListener; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vertx.java.core.buffer.Buffer; + +/** + * Test basic functionality of the Netty based TCP transport. + */ +public class NettyTcpTransportTest extends QpidJmsTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class); + + private boolean transportClosed; + private final List<Throwable> exceptions = new ArrayList<Throwable>(); + private final List<Buffer> data = new ArrayList<Buffer>(); + + private final TransportListener testListener = new NettyTransportListener(); + private final TcpTransportOptions testOptions = new TcpTransportOptions(); + + @Test(timeout = 60 * 1000) + public void testConnectToServer() throws Exception { + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + LOG.info("Connected to test server."); + } catch (Exception e) { + fail("Should have connected to the server"); + } + + assertTrue(transport.isConnected()); + + transport.close(); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testDetectServerClose() throws Exception { + NettyTcpTransport transport = null; + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + LOG.info("Connected to test server."); + } catch (Exception e) { + fail("Should have connected to the server"); + } + + assertTrue(transport.isConnected()); + + server.close(); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return transportClosed; + } + })); + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + assertFalse(transport.isConnected()); + + try { + transport.close(); + } catch (Exception ex) { + fail("Close of a disconnect transport should not generate errors"); + } + } + + @Test(timeout = 60 * 1000) + public void testDataSentIsReceived() throws Exception { + final int SEND_BYTE_COUNT = 1024; + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + LOG.info("Connected to test server."); + } catch (Exception e) { + fail("Should have connected to the server"); + } + + assertTrue(transport.isConnected()); + + ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT); + for (int i = 0; i < SEND_BYTE_COUNT; ++i) { + sendBuffer.writeByte('A'); + } + + transport.send(sendBuffer); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !data.isEmpty(); + } + })); + + assertEquals(SEND_BYTE_COUNT, data.get(0).length()); + + transport.close(); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + } + + private class NettyTransportListener implements TransportListener { + + @Override + public void onData(Buffer incoming) { + LOG.info("Client has new incoming data of size: {}", incoming.length()); + data.add(incoming); + } + + @Override + public void onTransportClosed() { + LOG.info("Transport reports that it has closed."); + transportClosed = true; + } + + @Override + public void onTransportError(Throwable cause) { + LOG.info("Transport error caught: {}", cause.getMessage()); + exceptions.add(cause); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org