Heavy refactoring of the Transport layer in the client. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/abde5ef2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/abde5ef2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/abde5ef2
Branch: refs/heads/master Commit: abde5ef20cc173f50d05afc812b93ad0a9cc892d Parents: 24bd2a9 Author: Timothy Bish <tabish...@gmail.com> Authored: Thu Jan 8 16:44:07 2015 -0500 Committer: Timothy Bish <tabish...@gmail.com> Committed: Thu Jan 8 16:44:07 2015 -0500 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 53 ++- .../qpid/jms/provider/amqp/AmqpSslProvider.java | 47 --- .../provider/amqp/AmqpSslProviderFactory.java | 7 +- .../qpid/jms/transports/NettyTcpTransport.java | 197 ---------- .../qpid/jms/transports/RawTcpTransport.java | 377 ------------------- .../qpid/jms/transports/SslTransport.java | 59 --- .../jms/transports/TcpBufferedInputStream.java | 139 ------- .../jms/transports/TcpBufferedOutputStream.java | 126 ------- .../qpid/jms/transports/TcpTransport.java | 279 -------------- .../jms/transports/TcpTransportOptions.java | 153 -------- .../qpid/jms/transports/TransportFactory.java | 110 ++++++ .../qpid/jms/transports/TransportOptions.java | 155 ++++++++ .../jms/transports/netty/NettyTcpTransport.java | 224 +++++++++++ .../netty/NettyTcpTransportFactory.java | 63 ++++ .../jms/transports/plain/PlainTcpTransport.java | 362 ++++++++++++++++++ .../plain/PlainTcpTransportFactory.java | 63 ++++ .../plain/TcpBufferedInputStream.java | 139 +++++++ .../plain/TcpBufferedOutputStream.java | 126 +++++++ .../qpid/jms/transports/vertx/SslTransport.java | 86 +++++ .../transports/vertx/SslTransportFactory.java | 42 +++ .../qpid/jms/transports/vertx/TcpTransport.java | 254 +++++++++++++ .../transports/vertx/TcpTransportFactory.java | 63 ++++ .../services/org/apache/qpid/jms/transports/ssl | 17 + .../services/org/apache/qpid/jms/transports/tcp | 17 + .../jms/test/netty/NettyTcpTransportTest.java | 6 +- 25 files changed, 1753 insertions(+), 1411 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 98c788c..e68bc6e 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -45,10 +45,9 @@ import org.apache.qpid.jms.provider.AbstractProvider; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFuture; -import org.apache.qpid.jms.transports.TcpTransport; +import org.apache.qpid.jms.transports.TransportFactory; import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.util.IOExceptionSupport; -import org.apache.qpid.jms.util.PropertyUtil; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Event; @@ -84,9 +83,11 @@ public class AmqpProvider extends AbstractProvider implements TransportListener // NOTE: Limit default channel max to signed short range to deal with // brokers that don't currently handle the unsigned range well. private static final int DEFAULT_CHANNEL_MAX = 32767; + private static final String DEFAULT_TRANSPORT_KEY = "tcp"; private AmqpConnection connection; private org.apache.qpid.jms.transports.Transport transport; + private String transportKey = DEFAULT_TRANSPORT_KEY; private boolean traceFrames; private boolean traceBytes; private boolean presettleConsumers; @@ -125,25 +126,12 @@ public class AmqpProvider extends AbstractProvider implements TransportListener public void connect() throws IOException { checkClosed(); - transport = createTransport(getRemoteURI()); - - Map<String, String> map = null; try { - map = PropertyUtil.parseQuery(remoteURI.getQuery()); + transport = TransportFactory.create(getTransportKey(), getRemoteURI()); } catch (Exception e) { throw IOExceptionSupport.create(e); } - Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "transport."); - - if (!PropertyUtil.setProperties(transport, providerOptions)) { - String msg = "" - + " Not all transport options could be set on the AMQP Provider transport." - + " Check the options are spelled correctly." - + " Given parameters=[" + providerOptions + "]." - + " This provider instance cannot be started."; - throw new IOException(msg); - } - + transport.setTransportListener(this); transport.connect(); } @@ -593,19 +581,6 @@ public class AmqpProvider extends AbstractProvider implements TransportListener }); } - /** - * Provides an extension point for subclasses to insert other types of transports such - * as SSL etc. - * - * @param remoteLocation - * The remote location where the transport should attempt to connect. - * - * @return the newly created transport instance. - */ - protected org.apache.qpid.jms.transports.Transport createTransport(URI remoteLocation) { - return new TcpTransport(this, remoteLocation); - } - private void updateTracer() { if (isTraceFrames()) { ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() { @@ -881,4 +856,22 @@ public class AmqpProvider extends AbstractProvider implements TransportListener public void setChannelMax(int channelMax) { this.channelMax = channelMax; } + + /** + * @return the transportKey that will be used to create the network level connection. + */ + public String getTransportKey() { + return transportKey; + } + + /** + * Sets the transport key used to lookup a Transport instance when an attempt + * is made to connect to a remote peer. + * + * @param transportKey + * the tansportKey to used when looking up a Transport to use. + */ + void setTransportKey(String transportKey) { + this.transportKey = transportKey; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java deleted file mode 100644 index af7fe7f..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProvider.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.provider.amqp; - -import java.net.URI; -import java.util.Map; - -import org.apache.qpid.jms.JmsSslContext; -import org.apache.qpid.jms.transports.SslTransport; -import org.apache.qpid.jms.transports.Transport; - -/** - * AmqpProvider extension that enables SSL based transports. - */ -public class AmqpSslProvider extends AmqpProvider { - - private final JmsSslContext sslContext; - - public AmqpSslProvider(URI remoteURI) { - super(remoteURI); - this.sslContext = JmsSslContext.getCurrentSslContext(); - } - - public AmqpSslProvider(URI remoteURI, Map<String, String> extraOptions) { - super(remoteURI, extraOptions); - this.sslContext = JmsSslContext.getCurrentSslContext(); - } - - @Override - protected Transport createTransport(URI remoteLocation) { - return new SslTransport(this, remoteLocation, sslContext); - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java index 01a4f85..a19af00 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSslProviderFactory.java @@ -27,6 +27,11 @@ public class AmqpSslProviderFactory extends AmqpProviderFactory { @Override public Provider createProvider(URI remoteURI) throws Exception { - return new AmqpSslProvider(remoteURI); + AmqpProvider provider = new AmqpProvider(remoteURI); + // TODO - Would be better if we could do away with this and define + // the transport key in the properties file used to find the + // AmqpProcvider instance. + provider.setTransportKey("ssl"); + return provider; } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java deleted file mode 100644 index 49637c7..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.transports; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; - -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.qpid.jms.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator; - -/** - * TCP based transport that uses Netty as the underlying IO layer. - */ -public class NettyTcpTransport implements Transport { - - private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); - - private Bootstrap bootstrap; - private EventLoopGroup group; - private Channel channel; - private TransportListener listener; - private final TcpTransportOptions options; - private final URI remote; - - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - - /** - * Create a new transport instance - * - * @param options - * the transport options used to configure the socket connection. - */ - public NettyTcpTransport(TransportListener listener, URI remoteLocation, TcpTransportOptions options) { - this.options = options; - this.listener = listener; - this.remote = remoteLocation; - } - - @Override - public void connect() throws IOException { - - if (listener == null) { - throw new IllegalStateException("A transport listener must be set before connection attempts."); - } - - group = new NioEventLoopGroup(); - - bootstrap = new Bootstrap(); - bootstrap.group(group); - bootstrap.channel(NioSocketChannel.class); - bootstrap.handler(new ChannelInitializer<Channel>() { - - @Override - public void initChannel(Channel connectedChannel) throws Exception { - channel = connectedChannel; - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } - }); - - configureNetty(bootstrap, options); - - ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort()); - future.awaitUninterruptibly(); - - if (future.isCancelled()) { - throw new IOException("Connection attempt was cancelled"); - } else if (!future.isSuccess()) { - throw IOExceptionSupport.create(future.cause()); - } else { - connected.set(true); - } - } - - @Override - public boolean isConnected() { - return connected.get(); - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - channel.close(); - group.shutdownGracefully(); - } - } - - @Override - public void send(ByteBuffer output) throws IOException { - send(Unpooled.wrappedBuffer(output)); - } - - @Override - public void send(ByteBuf output) throws IOException { - channel.write(output); - channel.flush(); - } - - @Override - public TransportListener getTransportListener() { - return listener; - } - - @Override - public void setTransportListener(TransportListener listener) { - this.listener = listener; - } - - //----- Internal implementation details ----------------------------------// - - protected void configureNetty(Bootstrap bootstrap, TcpTransportOptions options) { - bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); - bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); - bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); - - if (options.getSendBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); - } - - if (options.getReceiveBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize()); - bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize())); - } - - if (options.getTrafficClass() != -1) { - bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); - } - } - - //----- Handle connection events -----------------------------------------// - - private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> { - - @Override - public void channelActive(ChannelHandlerContext context) throws Exception { - LOG.info("Channel has become active! Channel is {}", context.channel()); - } - - @Override - public void channelInactive(ChannelHandlerContext context) throws Exception { - LOG.info("Channel has gone inactive! Channel is {}", context.channel()); - if (!closed.get()) { - connected.set(false); - listener.onTransportClosed(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - LOG.info("Exception on channel! Channel is {}", context.channel()); - if (!closed.get()) { - connected.set(false); - listener.onTransportError(cause); - } - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { - LOG.info("New data read: {} bytes incoming", buffer.readableBytes()); - listener.onData(buffer); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java deleted file mode 100644 index 210da61..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/RawTcpTransport.java +++ /dev/null @@ -1,377 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.transports; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.net.URI; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.SocketFactory; - -import org.apache.qpid.jms.util.IOExceptionSupport; -import org.apache.qpid.jms.util.InetAddressUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class RawTcpTransport implements Transport, Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(RawTcpTransport.class); - - private TransportListener listener; - private final URI remoteLocation; - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>(); - - private final Socket socket; - private DataOutputStream dataOut; - private DataInputStream dataIn; - private Thread runner; - - private boolean closeAsync = true; - private int socketBufferSize = 64 * 1024; - private int soTimeout = 0; - private int soLinger = Integer.MIN_VALUE; - private Boolean keepAlive; - private Boolean tcpNoDelay = true; - private boolean useLocalHost = false; - private int ioBufferSize = 8 * 1024; - - /** - * Create a new instance of the transport. - * - * @param listener - * The TransportListener that will receive data from this Transport instance. - * @param remoteLocation - * The remote location where this transport should connection to. - */ - public RawTcpTransport(TransportListener listener, URI remoteLocation) { - this.listener = listener; - this.remoteLocation = remoteLocation; - - Socket temp = null; - try { - temp = createSocketFactory().createSocket(); - } catch (IOException e) { - connectionError.set(e); - } - - this.socket = temp; - } - - @Override - public void connect() throws IOException { - if (connectionError.get() != null) { - throw IOExceptionSupport.create(connectionError.get()); - } - - if (socket == null) { - throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); - } - - InetSocketAddress remoteAddress = null; - - if (remoteLocation != null) { - String host = resolveHostName(remoteLocation.getHost()); - remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); - } - - socket.connect(remoteAddress); - - connected.set(true); - - initialiseSocket(socket); - initializeStreams(); - - runner = new Thread(null, this, "QpidJMS RawTcpTransport: " + toString()); - runner.setDaemon(false); - runner.start(); - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - if (socket == null) { - return; - } - - // Closing the streams flush the sockets before closing.. if the socket - // is hung.. then this hangs the close so we support an asynchronous close - // by default which will timeout if the close doesn't happen after a delay. - if (closeAsync) { - final CountDownLatch latch = new CountDownLatch(1); - - final ExecutorService closer = Executors.newSingleThreadExecutor(); - closer.execute(new Runnable() { - @Override - public void run() { - LOG.trace("Closing socket {}", socket); - try { - socket.close(); - LOG.debug("Closed socket {}", socket); - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); - } - } finally { - latch.countDown(); - } - } - }); - - try { - latch.await(1,TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - closer.shutdownNow(); - } - } else { - LOG.trace("Closing socket {}", socket); - try { - socket.close(); - LOG.debug("Closed socket {}", socket); - } catch (IOException e) { - LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e); - } - } - } - } - - @Override - public void send(ByteBuffer output) throws IOException { - checkConnected(); - LOG.info("RawTcpTransport sending packet of size: {}", output.remaining()); - WritableByteChannel channel = Channels.newChannel(dataOut); - channel.write(output); - dataOut.flush(); - } - - @Override - public void send(ByteBuf output) throws IOException { - checkConnected(); - send(output.nioBuffer()); - } - - @Override - public boolean isConnected() { - return this.connected.get(); - } - - @Override - public TransportListener getTransportListener() { - return this.listener; - } - - @Override - public void setTransportListener(TransportListener listener) { - if (listener == null) { - throw new IllegalArgumentException("Listener cannot be set to null"); - } - - this.listener = listener; - } - - public int getSocketBufferSize() { - return socketBufferSize; - } - - public void setSocketBufferSize(int socketBufferSize) { - this.socketBufferSize = socketBufferSize; - } - - public int getSoTimeout() { - return soTimeout; - } - - public void setSoTimeout(int soTimeout) { - this.soTimeout = soTimeout; - } - - public boolean isTcpNoDelay() { - return tcpNoDelay; - } - - public void setTcpNoDelay(Boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - public int getSoLinger() { - return soLinger; - } - - public void setSoLinger(int soLinger) { - this.soLinger = soLinger; - } - - public boolean isKeepAlive() { - return keepAlive; - } - - public void setKeepAlive(Boolean keepAlive) { - this.keepAlive = keepAlive; - } - - public boolean isUseLocalHost() { - return useLocalHost; - } - - public void setUseLocalHost(boolean useLocalHost) { - this.useLocalHost = useLocalHost; - } - - public int getIoBufferSize() { - return ioBufferSize; - } - - public void setIoBufferSize(int ioBufferSize) { - this.ioBufferSize = ioBufferSize; - } - - public boolean isCloseAsync() { - return closeAsync; - } - - public void setCloseAsync(boolean closeAsync) { - this.closeAsync = closeAsync; - } - - //---------- Transport internal implementation ---------------------------// - - @Override - public void run() { - LOG.trace("TCP consumer thread for " + this + " starting"); - try { - while (isConnected()) { - doRun(); - } - } catch (IOException e) { - connectionError.set(e); - onException(e); - } catch (Throwable e) { - IOException ioe = new IOException("Unexpected error occured: " + e); - connectionError.set(ioe); - ioe.initCause(e); - onException(ioe); - } - } - - protected void doRun() throws IOException { - int size = dataIn.available(); - if (size <= 0) { - try { - TimeUnit.NANOSECONDS.sleep(1); - } catch (InterruptedException e) { - } - return; - } - - byte[] buffer = new byte[size]; - dataIn.readFully(buffer); - listener.onData(Unpooled.wrappedBuffer(buffer)); - } - - /** - * Passes any IO exceptions into the transport listener - */ - public void onException(IOException e) { - if (listener != null) { - try { - listener.onTransportError(e); - } catch (RuntimeException e2) { - LOG.debug("Unexpected runtime exception: " + e2, e2); - } - } - } - - protected SocketFactory createSocketFactory() throws IOException { - return SocketFactory.getDefault(); - } - - protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { - try { - sock.setReceiveBufferSize(socketBufferSize); - sock.setSendBufferSize(socketBufferSize); - } catch (SocketException se) { - LOG.warn("Cannot set socket buffer size = {}", socketBufferSize); - LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se); - } - - sock.setSoTimeout(soTimeout); - - if (keepAlive != null) { - sock.setKeepAlive(keepAlive.booleanValue()); - } - - if (soLinger > -1) { - sock.setSoLinger(true, soLinger); - } else if (soLinger == -1) { - sock.setSoLinger(false, 0); - } - - if (tcpNoDelay != null) { - sock.setTcpNoDelay(tcpNoDelay.booleanValue()); - } - } - - protected void initializeStreams() throws IOException { - try { - TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); - this.dataIn = new DataInputStream(buffIn); - TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); - this.dataOut = new DataOutputStream(outputStream); - } catch (Throwable e) { - throw IOExceptionSupport.create(e); - } - } - - protected String resolveHostName(String host) throws UnknownHostException { - if (isUseLocalHost()) { - String localName = InetAddressUtil.getLocalHostName(); - if (localName != null && localName.equals(host)) { - return "localhost"; - } - } - return host; - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java deleted file mode 100644 index 0860aae..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/SslTransport.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.transports; - -import java.io.IOException; -import java.net.URI; - -import org.apache.qpid.jms.JmsSslContext; -import org.vertx.java.core.net.NetClient; - -/** - * Provides SSL configuration to the Vert.x NetClient object used by the underling - * TCP based Transport. - */ -public class SslTransport extends TcpTransport { - - private final JmsSslContext context; - - /** - * Create an instance of the SSL transport - * - * @param listener - * The TransportListener that will handle events from this Transport instance. - * @param remoteLocation - * The location that is being connected to. - * @param context - * The JMS Framework SslContext to use for this SSL connection. - */ - public SslTransport(TransportListener listener, URI remoteLocation, JmsSslContext context) { - super(listener, remoteLocation); - - this.context = context; - } - - @Override - protected void configureNetClient(NetClient client) throws IOException { - super.configureNetClient(client); - - client.setSSL(true); - client.setKeyStorePath(context.getKeyStoreLocation()); - client.setKeyStorePassword(context.getKeyStorePassword()); - client.setTrustStorePath(context.getTrustStoreLocation()); - client.setTrustStorePassword(context.getTrustStorePassword()); - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java deleted file mode 100644 index c7ba887..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.transports; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -/** - * An optimized buffered input stream for Tcp - */ -public class TcpBufferedInputStream extends FilterInputStream { - - private static final int DEFAULT_BUFFER_SIZE = 8192; - protected byte internalBuffer[]; - protected int count; - protected int position; - - public TcpBufferedInputStream(InputStream in) { - this(in, DEFAULT_BUFFER_SIZE); - } - - public TcpBufferedInputStream(InputStream in, int size) { - super(in); - if (size <= 0) { - throw new IllegalArgumentException("Buffer size <= 0"); - } - internalBuffer = new byte[size]; - } - - protected void fill() throws IOException { - byte[] buffer = internalBuffer; - count = 0; - position = 0; - int n = in.read(buffer, position, buffer.length - position); - if (n > 0) { - count = n + position; - } - } - - @Override - public int read() throws IOException { - if (position >= count) { - fill(); - if (position >= count) { - return -1; - } - } - return internalBuffer[position++] & 0xff; - } - - private int readStream(byte[] b, int off, int len) throws IOException { - int avail = count - position; - if (avail <= 0) { - if (len >= internalBuffer.length) { - return in.read(b, off, len); - } - fill(); - avail = count - position; - if (avail <= 0) { - return -1; - } - } - int cnt = (avail < len) ? avail : len; - System.arraycopy(internalBuffer, position, b, off, cnt); - position += cnt; - return cnt; - } - - @Override - public int read(byte b[], int off, int len) throws IOException { - if ((off | len | (off + len) | (b.length - (off + len))) < 0) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } - int n = 0; - for (;;) { - int nread = readStream(b, off + n, len - n); - if (nread <= 0) { - return (n == 0) ? nread : n; - } - n += nread; - if (n >= len) { - return n; - } - // if not closed but no bytes available, return - InputStream input = in; - if (input != null && input.available() <= 0) { - return n; - } - } - } - - @Override - public long skip(long n) throws IOException { - if (n <= 0) { - return 0; - } - long avail = count - position; - if (avail <= 0) { - return in.skip(n); - } - long skipped = (avail < n) ? avail : n; - position += skipped; - return skipped; - } - - @Override - public int available() throws IOException { - return in.available() + (count - position); - } - - @Override - public boolean markSupported() { - return false; - } - - @Override - public void close() throws IOException { - if (in != null) { - in.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java deleted file mode 100644 index 82f8c41..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.transports; - -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -/** - * An optimized buffered outputstream for Tcp - */ -public class TcpBufferedOutputStream extends FilterOutputStream { - - private static final int BUFFER_SIZE = 8192; - private final byte[] buffer; - private final int bufferlen; - private int count; - - /** - * Constructor - * - * @param out - */ - public TcpBufferedOutputStream(OutputStream out) { - this(out, BUFFER_SIZE); - } - - /** - * Creates a new buffered output stream to write data to the specified underlying output - * stream with the specified buffer size. - * - * @param out - * the underlying output stream. - * @param size - * the buffer size. - * @throws IllegalArgumentException - * if size <= 0. - */ - public TcpBufferedOutputStream(OutputStream out, int size) { - super(out); - if (size <= 0) { - throw new IllegalArgumentException("Buffer size <= 0"); - } - buffer = new byte[size]; - bufferlen = size; - } - - /** - * write a byte on to the stream - * - * @param b - * - byte to write - * @throws IOException - */ - @Override - public void write(int b) throws IOException { - if ((bufferlen - count) < 1) { - flush(); - } - buffer[count++] = (byte) b; - } - - /** - * write a byte array to the stream - * - * @param b - * the byte buffer - * @param off - * the offset into the buffer - * @param len - * the length of data to write - * @throws IOException - */ - @Override - public void write(byte b[], int off, int len) throws IOException { - if (b != null) { - if ((bufferlen - count) < len) { - flush(); - } - if (buffer.length >= len) { - System.arraycopy(b, off, buffer, count, len); - count += len; - } else { - out.write(b, off, len); - } - } - } - - /** - * flush the data to the output stream This doesn't call flush on the underlying - * outputstream, because Tcp is particularly efficent at doing this itself .... - * - * @throws IOException - */ - @Override - public void flush() throws IOException { - if (count > 0 && out != null) { - out.write(buffer, 0, count); - count = 0; - } - } - - /** - * close this stream - * - * @throws IOException - */ - @Override - public void close() throws IOException { - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java deleted file mode 100644 index bc2cada..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java +++ /dev/null @@ -1,279 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.transports; - -import io.netty.buffer.ByteBuf; - -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.qpid.jms.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.vertx.java.core.AsyncResult; -import org.vertx.java.core.AsyncResultHandler; -import org.vertx.java.core.Handler; -import org.vertx.java.core.Vertx; -import org.vertx.java.core.buffer.Buffer; -import org.vertx.java.core.impl.DefaultVertxFactory; -import org.vertx.java.core.net.NetClient; -import org.vertx.java.core.net.NetSocket; - -/** - * Vertex based TCP transport for raw data packets. - */ -public class TcpTransport implements Transport { - - private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); - - private final Vertx vertx; - private final NetClient client; - private final URI remoteLocation; - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>(); - - private NetSocket socket; - - private TransportListener listener; - private int socketBufferSize = 64 * 1024; - private int soTimeout = -1; - private int connectTimeout = -1; - private int soLinger = Integer.MIN_VALUE; - private boolean keepAlive; - private boolean tcpNoDelay = true; - - /** - * Create a new instance of the transport. - * - * @param listener - * The TransportListener that will receive data from this Transport instance. - * @param remoteLocation - * The remote location where this transport should connection to. - */ - public TcpTransport(TransportListener listener, URI remoteLocation) { - this.listener = listener; - this.remoteLocation = remoteLocation; - - DefaultVertxFactory vertxFactory = new DefaultVertxFactory(); - this.vertx = vertxFactory.createVertx(); - this.client = vertx.createNetClient(); - } - - @Override - public void connect() throws IOException { - final CountDownLatch connectLatch = new CountDownLatch(1); - - if (listener == null) { - throw new IllegalStateException("A transport listener must be set before connection attempts."); - } - - configureNetClient(client); - - try { - client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler<NetSocket>() { - @Override - public void handle(AsyncResult<NetSocket> asyncResult) { - if (asyncResult.succeeded()) { - socket = asyncResult.result(); - LOG.info("We have connected! Socket is {}", socket); - - connected.set(true); - connectLatch.countDown(); - - socket.dataHandler(new Handler<Buffer>() { - @Override - public void handle(Buffer event) { - listener.onData(event.getByteBuf()); - } - }); - - socket.closeHandler(new Handler<Void>() { - @Override - public void handle(Void event) { - if (!closed.get()) { - connected.set(false); - listener.onTransportClosed(); - } - } - }); - - socket.exceptionHandler(new Handler<Throwable>() { - @Override - public void handle(Throwable event) { - if (!closed.get()) { - connected.set(false); - listener.onTransportError(event); - } - } - }); - - } else { - connected.set(false); - connectionError.set(asyncResult.cause()); - connectLatch.countDown(); - } - } - }); - } catch (Throwable reason) { - LOG.info("Failed to connect to target Broker: {}", reason); - throw IOExceptionSupport.create(reason); - } - - try { - connectLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - if (connectionError.get() != null) { - throw IOExceptionSupport.create(connectionError.get()); - } - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - if (connected.get()) { - socket.close(); - connected.set(false); - } - - vertx.stop(); - } - } - - @Override - public void send(ByteBuffer output) throws IOException { - checkConnected(); - int length = output.remaining(); - if (length == 0) { - return; - } - - byte[] copy = new byte[length]; - output.get(copy); - Buffer sendBuffer = new Buffer(copy); - - vertx.eventBus().send(socket.writeHandlerID(), sendBuffer); - } - - @Override - public void send(ByteBuf output) throws IOException { - checkConnected(); - int length = output.readableBytes(); - if (length == 0) { - return; - } - - Buffer sendBuffer = new Buffer(output.copy()); - vertx.eventBus().send(socket.writeHandlerID(), sendBuffer); - } - - /** - * Allows a subclass to configure the NetClient beyond what this transport might do. - * - * @throws IOException if an error occurs. - */ - protected void configureNetClient(NetClient client) throws IOException { - client.setSendBufferSize(getSocketBufferSize()); - client.setReceiveBufferSize(getSocketBufferSize()); - client.setSoLinger(soLinger); - client.setTCPKeepAlive(keepAlive); - client.setTCPNoDelay(tcpNoDelay); - if (connectTimeout >= 0) { - client.setConnectTimeout(connectTimeout); - } - } - - @Override - public boolean isConnected() { - return this.connected.get(); - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } - } - - @Override - public TransportListener getTransportListener() { - return this.listener; - } - - @Override - public void setTransportListener(TransportListener listener) { - if (listener == null) { - throw new IllegalArgumentException("Listener cannot be set to null"); - } - - this.listener = listener; - } - - public int getSocketBufferSize() { - return socketBufferSize; - } - - public void setSocketBufferSize(int socketBufferSize) { - this.socketBufferSize = socketBufferSize; - } - - public int getSoTimeout() { - return soTimeout; - } - - public void setSoTimeout(int soTimeout) { - this.soTimeout = soTimeout; - } - - public boolean isTcpNoDelay() { - return tcpNoDelay; - } - - public void setTcpNoDelay(boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - public int getSoLinger() { - return soLinger; - } - - public void setSoLinger(int soLinger) { - this.soLinger = soLinger; - } - - public boolean isKeepAlive() { - return keepAlive; - } - - public void setKeepAlive(boolean keepAlive) { - this.keepAlive = keepAlive; - } - - public int getConnectTimeout() { - return connectTimeout; - } - - public void setConnectTimeout(int connectTimeout) { - this.connectTimeout = connectTimeout; - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java deleted file mode 100644 index e5f90c3..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.transports; - -/** - * Encapsulates all the TCP Transport options in one configuration object. - */ -public class TcpTransportOptions { - - public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024; - public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE; - public static final int DEFAULT_TRAFFIC_CLASS = 0; - public static final boolean DEFAULT_TCP_NO_DELAY = true; - public static final boolean DEFAULT_TCP_KEEP_ALIVE = false; - public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE; - public static final int DEFAULT_SO_TIMEOUT = -1; - public static final int DEFAULT_CONNECT_TIMEOUT = 60000; - - private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; - private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; - private int trafficClass = DEFAULT_TRAFFIC_CLASS; - private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; - private int soTimeout = DEFAULT_SO_TIMEOUT; - private int soLinger = DEFAULT_SO_LINGER; - private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE; - private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; - - /** - * @return the currently set send buffer size in bytes. - */ - public int getSendBufferSize() { - return sendBufferSize; - } - - /** - * Sets the send buffer size in bytes, the value must be greater than zero - * or an {@link IllegalArgumentException} will be thrown. - * - * @param sendBufferSize - * the new send buffer size for the TCP Transport. - * - * @throws IllegalArgumentException if the value given is not in the valid range. - */ - public void setSendBufferSize(int sendBufferSize) { - if (sendBufferSize <= 0) { - throw new IllegalArgumentException("The send buffer size must be > 0"); - } - - this.sendBufferSize = sendBufferSize; - } - - /** - * @return the currently configured receive buffer size in bytes. - */ - public int getReceiveBufferSize() { - return receiveBufferSize; - } - - /** - * Sets the receive buffer size in bytes, the value must be greater than zero - * or an {@link IllegalArgumentException} will be thrown. - * - * @param receiveBufferSize - * the new receive buffer size for the TCP Transport. - * - * @throws IllegalArgumentException if the value given is not in the valid range. - */ - public void setReceiveBufferSize(int receiveBufferSize) { - if (receiveBufferSize <= 0) { - throw new IllegalArgumentException("The send buffer size must be > 0"); - } - - this.receiveBufferSize = receiveBufferSize; - } - - /** - * @return the currently configured traffic class value. - */ - public int getTrafficClass() { - return trafficClass; - } - - /** - * Sets the traffic class value used by the TCP connection, valid - * range is between 0 and 255. - * - * @param trafficClass - * the new traffic class value. - * - * @throws IllegalArgumentException if the value given is not in the valid range. - */ - public void setTrafficClass(int trafficClass) { - if (trafficClass < 0 || trafficClass > 255) { - throw new IllegalArgumentException("Traffic class must be in the range [0..255]"); - } - - this.trafficClass = trafficClass; - } - - public int getSoTimeout() { - return soTimeout; - } - - public void setSoTimeout(int soTimeout) { - this.soTimeout = soTimeout; - } - - public boolean isTcpNoDelay() { - return tcpNoDelay; - } - - public void setTcpNoDelay(boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - public int getSoLinger() { - return soLinger; - } - - public void setSoLinger(int soLinger) { - this.soLinger = soLinger; - } - - public boolean isTcpKeepAlive() { - return tcpKeepAlive; - } - - public void setTcpKeepAlive(boolean keepAlive) { - this.tcpKeepAlive = keepAlive; - } - - public int getConnectTimeout() { - return connectTimeout; - } - - public void setConnectTimeout(int connectTimeout) { - this.connectTimeout = connectTimeout; - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java new file mode 100644 index 0000000..463adfb --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import java.io.IOException; +import java.net.URI; + +import org.apache.qpid.jms.util.FactoryFinder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Interface that all Transport types must implement. + */ +public abstract class TransportFactory { + + private static final Logger LOG = LoggerFactory.getLogger(TransportFactory.class); + + private static final FactoryFinder<TransportFactory> TRANSPORT_FACTORY_FINDER = + new FactoryFinder<TransportFactory>(TransportFactory.class, + "META-INF/services/" + TransportFactory.class.getPackage().getName().replace(".", "/") + "/"); + + /** + * Creates an instance of the given Transport and configures it using the + * properties set on the given remote broker URI. + * + * @param remoteURI + * The URI used to connect to a remote Broker. + * + * @return a new Transport instance. + * + * @throws Exception if an error occurs while creating the Transport instance. + */ + public abstract Transport createTransport(URI remoteURI) throws Exception; + + /** + * @return the name of this Transport. + */ + public abstract String getName(); + + /** + * Static create method that performs the TransportFactory search and handles the + * configuration and setup. + * + * @param transportKey + * The transport type name used to locate a TransportFactory. + * @param remoteURI + * the URI of the remote peer. + * + * @return a new Transport instance that is ready for use. + * + * @throws Exception if an error occurs while creating the Transport instance. + */ + public static Transport create(String transportKey, URI remoteURI) throws Exception { + Transport result = null; + + try { + TransportFactory factory = findTransportFactory(transportKey); + result = factory.createTransport(remoteURI); + } catch (Exception ex) { + LOG.error("Failed to create Transport instance for {}, due to: {}", remoteURI.getScheme(), ex); + LOG.trace("Error: ", ex); + throw ex; + } + + return result; + } + + /** + * Searches for a TransportFactory by using the scheme from the given key. + * + * The search first checks the local cache of Transport factories before moving on + * to search in the class-path. + * + * @param transportKey + * The transport type name used to locate a TransportFactory. + * + * @return a Transport factory instance matching the transport key. + * + * @throws IOException if an error occurs while locating the factory. + */ + public static TransportFactory findTransportFactory(String transportKey) throws IOException { + if (transportKey == null) { + throw new IOException("No Transport key specified"); + } + + TransportFactory factory = null; + try { + factory = TRANSPORT_FACTORY_FINDER.newInstance(transportKey); + } catch (Throwable e) { + throw new IOException("Transport type NOT recognized: [" + transportKey + "]", e); + } + + return factory; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java new file mode 100644 index 0000000..f0e34ca --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +/** + * Encapsulates all the TCP Transport options in one configuration object. + */ +public class TransportOptions { + + public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024; + public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE; + public static final int DEFAULT_TRAFFIC_CLASS = 0; + public static final boolean DEFAULT_TCP_NO_DELAY = true; + public static final boolean DEFAULT_TCP_KEEP_ALIVE = false; + public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE; + public static final int DEFAULT_SO_TIMEOUT = -1; + public static final int DEFAULT_CONNECT_TIMEOUT = 60000; + + public static final TransportOptions DEFAULT_OPTIONS = new TransportOptions(); + + private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; + private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; + private int trafficClass = DEFAULT_TRAFFIC_CLASS; + private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int soTimeout = DEFAULT_SO_TIMEOUT; + private int soLinger = DEFAULT_SO_LINGER; + private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE; + private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; + + /** + * @return the currently set send buffer size in bytes. + */ + public int getSendBufferSize() { + return sendBufferSize; + } + + /** + * Sets the send buffer size in bytes, the value must be greater than zero + * or an {@link IllegalArgumentException} will be thrown. + * + * @param sendBufferSize + * the new send buffer size for the TCP Transport. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setSendBufferSize(int sendBufferSize) { + if (sendBufferSize <= 0) { + throw new IllegalArgumentException("The send buffer size must be > 0"); + } + + this.sendBufferSize = sendBufferSize; + } + + /** + * @return the currently configured receive buffer size in bytes. + */ + public int getReceiveBufferSize() { + return receiveBufferSize; + } + + /** + * Sets the receive buffer size in bytes, the value must be greater than zero + * or an {@link IllegalArgumentException} will be thrown. + * + * @param receiveBufferSize + * the new receive buffer size for the TCP Transport. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setReceiveBufferSize(int receiveBufferSize) { + if (receiveBufferSize <= 0) { + throw new IllegalArgumentException("The send buffer size must be > 0"); + } + + this.receiveBufferSize = receiveBufferSize; + } + + /** + * @return the currently configured traffic class value. + */ + public int getTrafficClass() { + return trafficClass; + } + + /** + * Sets the traffic class value used by the TCP connection, valid + * range is between 0 and 255. + * + * @param trafficClass + * the new traffic class value. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setTrafficClass(int trafficClass) { + if (trafficClass < 0 || trafficClass > 255) { + throw new IllegalArgumentException("Traffic class must be in the range [0..255]"); + } + + this.trafficClass = trafficClass; + } + + public int getSoTimeout() { + return soTimeout; + } + + public void setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public int getSoLinger() { + return soLinger; + } + + public void setSoLinger(int soLinger) { + this.soLinger = soLinger; + } + + public boolean isTcpKeepAlive() { + return tcpKeepAlive; + } + + public void setTcpKeepAlive(boolean keepAlive) { + this.tcpKeepAlive = keepAlive; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java new file mode 100644 index 0000000..3d4a928 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.Transport; +import org.apache.qpid.jms.transports.TransportListener; +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator; + +/** + * TCP based transport that uses Netty as the underlying IO layer. + */ +public class NettyTcpTransport implements Transport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); + + private Bootstrap bootstrap; + private EventLoopGroup group; + private Channel channel; + private TransportListener listener; + private TransportOptions options; + private final URI remote; + + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + + /** + * Create a new transport instance + * + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTcpTransport(URI remoteLocation, TransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) { + this.options = options; + this.listener = listener; + this.remote = remoteLocation; + } + + @Override + public void connect() throws IOException { + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + group = new NioEventLoopGroup(); + + bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer<Channel>() { + + @Override + public void initChannel(Channel connectedChannel) throws Exception { + channel = connectedChannel; + channel.pipeline().addLast(new NettyTcpTransportHandler()); + } + }); + + configureNetty(bootstrap, getTransportOptions()); + + ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort()); + future.awaitUninterruptibly(); + + if (future.isCancelled()) { + throw new IOException("Connection attempt was cancelled"); + } else if (!future.isSuccess()) { + throw IOExceptionSupport.create(future.cause()); + } else { + connected.set(true); + } + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + channel.close(); + group.shutdownGracefully(); + } + } + + @Override + public void send(ByteBuffer output) throws IOException { + send(Unpooled.wrappedBuffer(output)); + } + + @Override + public void send(ByteBuf output) throws IOException { + channel.write(output); + channel.flush(); + } + + @Override + public TransportListener getTransportListener() { + return listener; + } + + @Override + public void setTransportListener(TransportListener listener) { + this.listener = listener; + } + + public TransportOptions getTransportOptions() { + if (options == null) { + options = TransportOptions.DEFAULT_OPTIONS; + } + + return options; + } + + //----- Internal implementation details ----------------------------------// + + protected void configureNetty(Bootstrap bootstrap, TransportOptions options) { + bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); + bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + + if (options.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); + } + + if (options.getReceiveBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize())); + } + + if (options.getTrafficClass() != -1) { + bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); + } + } + + //----- Handle connection events -----------------------------------------// + + private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> { + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + LOG.info("Channel has become active! Channel is {}", context.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + LOG.info("Channel has gone inactive! Channel is {}", context.channel()); + if (!closed.get()) { + connected.set(false); + listener.onTransportClosed(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { + LOG.info("Exception on channel! Channel is {}", context.channel()); + if (!closed.get()) { + connected.set(false); + listener.onTransportError(cause); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + LOG.info("New data read: {} bytes incoming", buffer.readableBytes()); + listener.onData(buffer); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java new file mode 100644 index 0000000..d51013c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.net.URI; +import java.util.Map; + +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.Transport; +import org.apache.qpid.jms.transports.TransportFactory; +import org.apache.qpid.jms.util.PropertyUtil; + +/** + * Factory for creating the Netty based TCP Transport. + */ +public class NettyTcpTransportFactory extends TransportFactory { + + @Override + public Transport createTransport(URI remoteURI) throws Exception { + + Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery()); + Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport."); + + remoteURI = PropertyUtil.replaceQuery(remoteURI, map); + + TransportOptions transportOptions = new TransportOptions(); + + if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) { + String msg = " Not all transport options could be set on the Transport." + + " Check the options are spelled correctly." + + " Given parameters=[" + transportURIOptions + "]." + + " This provider instance cannot be started."; + throw new IllegalArgumentException(msg); + } + + Transport result = doCreateTransport(remoteURI, transportOptions); + + return result; + } + + protected NettyTcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception { + return new NettyTcpTransport(remoteURI, transportOptions); + } + + @Override + public String getName() { + return "TCP"; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org