Repository: qpid-jms Updated Branches: refs/heads/master 24bd2a919 -> abde5ef20
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java new file mode 100644 index 0000000..bb996f5 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.plain; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.SocketFactory; + +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.Transport; +import org.apache.qpid.jms.transports.TransportListener; +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.apache.qpid.jms.util.InetAddressUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class PlainTcpTransport implements Transport, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(PlainTcpTransport.class); + + private TransportListener listener; + private final URI remoteLocation; + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>(); + + private final Socket socket; + private DataOutputStream dataOut; + private DataInputStream dataIn; + private Thread runner; + + private TransportOptions options; + + private boolean closeAsync = true; + private boolean useLocalHost = false; + private int ioBufferSize = 8 * 1024; + + /** + * Create a new transport instance + * + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public PlainTcpTransport(URI remoteLocation, TransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public PlainTcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) { + this.options = options; + this.listener = listener; + this.remoteLocation = remoteLocation; + + Socket temp = null; + try { + temp = createSocketFactory().createSocket(); + } catch (IOException e) { + connectionError.set(e); + } + + this.socket = temp; + } + + @Override + public void connect() throws IOException { + if (connectionError.get() != null) { + throw IOExceptionSupport.create(connectionError.get()); + } + + if (socket == null) { + throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); + } + + InetSocketAddress remoteAddress = null; + + if (remoteLocation != null) { + String host = resolveHostName(remoteLocation.getHost()); + remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); + } + + socket.connect(remoteAddress); + + connected.set(true); + + initialiseSocket(socket); + initializeStreams(); + + runner = new Thread(null, this, "QpidJMS " + getClass().getSimpleName() + ": " + toString()); + runner.setDaemon(false); + runner.start(); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + if (socket == null) { + return; + } + + // Closing the streams flush the sockets before closing.. if the socket + // is hung.. then this hangs the close so we support an asynchronous close + // by default which will timeout if the close doesn't happen after a delay. + if (closeAsync) { + final CountDownLatch latch = new CountDownLatch(1); + + final ExecutorService closer = Executors.newSingleThreadExecutor(); + closer.execute(new Runnable() { + @Override + public void run() { + LOG.trace("Closing socket {}", socket); + try { + socket.close(); + LOG.debug("Closed socket {}", socket); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); + } + } finally { + latch.countDown(); + } + } + }); + + try { + latch.await(1,TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + closer.shutdownNow(); + } + } else { + LOG.trace("Closing socket {}", socket); + try { + socket.close(); + LOG.debug("Closed socket {}", socket); + } catch (IOException e) { + LOG.debug("Caught exception closing socket {}. This exception will be ignored.", socket, e); + } + } + } + } + + @Override + public void send(ByteBuffer output) throws IOException { + checkConnected(); + LOG.info("RawTcpTransport sending packet of size: {}", output.remaining()); + WritableByteChannel channel = Channels.newChannel(dataOut); + channel.write(output); + dataOut.flush(); + } + + @Override + public void send(ByteBuf output) throws IOException { + checkConnected(); + send(output.nioBuffer()); + } + + @Override + public boolean isConnected() { + return this.connected.get(); + } + + @Override + public TransportListener getTransportListener() { + return this.listener; + } + + @Override + public void setTransportListener(TransportListener listener) { + if (listener == null) { + throw new IllegalArgumentException("Listener cannot be set to null"); + } + + this.listener = listener; + } + + public TransportOptions getTransportOptions() { + if (options == null) { + options = TransportOptions.DEFAULT_OPTIONS; + } + + return options; + } + + public boolean isUseLocalHost() { + return useLocalHost; + } + + public void setUseLocalHost(boolean useLocalHost) { + this.useLocalHost = useLocalHost; + } + + public int getIoBufferSize() { + return ioBufferSize; + } + + public void setIoBufferSize(int ioBufferSize) { + this.ioBufferSize = ioBufferSize; + } + + public boolean isCloseAsync() { + return closeAsync; + } + + public void setCloseAsync(boolean closeAsync) { + this.closeAsync = closeAsync; + } + + //---------- Transport internal implementation ---------------------------// + + @Override + public void run() { + LOG.trace("TCP consumer thread for " + this + " starting"); + try { + while (isConnected()) { + doRun(); + } + } catch (IOException e) { + connectionError.set(e); + onException(e); + } catch (Throwable e) { + IOException ioe = new IOException("Unexpected error occured: " + e); + connectionError.set(ioe); + ioe.initCause(e); + onException(ioe); + } + } + + protected void doRun() throws IOException { + int size = dataIn.available(); + if (size <= 0) { + try { + TimeUnit.NANOSECONDS.sleep(1); + } catch (InterruptedException e) { + } + return; + } + + byte[] buffer = new byte[size]; + dataIn.readFully(buffer); + listener.onData(Unpooled.wrappedBuffer(buffer)); + } + + /** + * Passes any IO exceptions into the transport listener + */ + public void onException(IOException e) { + if (listener != null) { + try { + listener.onTransportError(e); + } catch (RuntimeException e2) { + LOG.debug("Unexpected runtime exception: " + e2, e2); + } + } + } + + protected SocketFactory createSocketFactory() throws IOException { + return SocketFactory.getDefault(); + } + + protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { + TransportOptions options = getTransportOptions(); + + try { + sock.setReceiveBufferSize(options.getReceiveBufferSize()); + } catch (SocketException se) { + LOG.warn("Cannot set socket receive buffer size = {}", options.getReceiveBufferSize()); + LOG.debug("Cannot set socket receive buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se); + } + + try { + sock.setSendBufferSize(options.getSendBufferSize()); + } catch (SocketException se) { + LOG.warn("Cannot set socket send buffer size = {}", options.getSendBufferSize()); + LOG.debug("Cannot set socket send buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se); + } + + sock.setSoTimeout(options.getSoTimeout()); + sock.setKeepAlive(options.isTcpKeepAlive()); + sock.setTcpNoDelay(options.isTcpNoDelay()); + + if (options.getSoLinger() > 0) { + sock.setSoLinger(true, options.getSoLinger()); + } else { + sock.setSoLinger(false, 0); + } + } + + protected void initializeStreams() throws IOException { + try { + TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); + this.dataIn = new DataInputStream(buffIn); + TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); + this.dataOut = new DataOutputStream(outputStream); + } catch (Throwable e) { + throw IOExceptionSupport.create(e); + } + } + + protected String resolveHostName(String host) throws UnknownHostException { + if (isUseLocalHost()) { + String localName = InetAddressUtil.getLocalHostName(); + if (localName != null && localName.equals(host)) { + return "localhost"; + } + } + return host; + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java new file mode 100644 index 0000000..3894408 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.plain; + +import java.net.URI; +import java.util.Map; + +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.Transport; +import org.apache.qpid.jms.transports.TransportFactory; +import org.apache.qpid.jms.util.PropertyUtil; + +/** + * Factory for creating the Plain TCP transport. + */ +public class PlainTcpTransportFactory extends TransportFactory { + + @Override + public Transport createTransport(URI remoteURI) throws Exception { + + Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery()); + Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport."); + + remoteURI = PropertyUtil.replaceQuery(remoteURI, map); + + TransportOptions transportOptions = new TransportOptions(); + + if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) { + String msg = " Not all transport options could be set on the Transport." + + " Check the options are spelled correctly." + + " Given parameters=[" + transportURIOptions + "]." + + " This provider instance cannot be started."; + throw new IllegalArgumentException(msg); + } + + Transport result = doCreateTransport(remoteURI, transportOptions); + + return result; + } + + protected PlainTcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception { + return new PlainTcpTransport(remoteURI, transportOptions); + } + + @Override + public String getName() { + return "TCP"; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java new file mode 100644 index 0000000..ee4bf5c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.plain; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * An optimized buffered input stream for Tcp + */ +public class TcpBufferedInputStream extends FilterInputStream { + + private static final int DEFAULT_BUFFER_SIZE = 8192; + protected byte internalBuffer[]; + protected int count; + protected int position; + + public TcpBufferedInputStream(InputStream in) { + this(in, DEFAULT_BUFFER_SIZE); + } + + public TcpBufferedInputStream(InputStream in, int size) { + super(in); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + internalBuffer = new byte[size]; + } + + protected void fill() throws IOException { + byte[] buffer = internalBuffer; + count = 0; + position = 0; + int n = in.read(buffer, position, buffer.length - position); + if (n > 0) { + count = n + position; + } + } + + @Override + public int read() throws IOException { + if (position >= count) { + fill(); + if (position >= count) { + return -1; + } + } + return internalBuffer[position++] & 0xff; + } + + private int readStream(byte[] b, int off, int len) throws IOException { + int avail = count - position; + if (avail <= 0) { + if (len >= internalBuffer.length) { + return in.read(b, off, len); + } + fill(); + avail = count - position; + if (avail <= 0) { + return -1; + } + } + int cnt = (avail < len) ? avail : len; + System.arraycopy(internalBuffer, position, b, off, cnt); + position += cnt; + return cnt; + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + int n = 0; + for (;;) { + int nread = readStream(b, off + n, len - n); + if (nread <= 0) { + return (n == 0) ? nread : n; + } + n += nread; + if (n >= len) { + return n; + } + // if not closed but no bytes available, return + InputStream input = in; + if (input != null && input.available() <= 0) { + return n; + } + } + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + long avail = count - position; + if (avail <= 0) { + return in.skip(n); + } + long skipped = (avail < n) ? avail : n; + position += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available() + (count - position); + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java new file mode 100644 index 0000000..84359ae --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.plain; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** + * An optimized buffered outputstream for Tcp + */ +public class TcpBufferedOutputStream extends FilterOutputStream { + + private static final int BUFFER_SIZE = 8192; + private final byte[] buffer; + private final int bufferlen; + private int count; + + /** + * Constructor + * + * @param out + */ + public TcpBufferedOutputStream(OutputStream out) { + this(out, BUFFER_SIZE); + } + + /** + * Creates a new buffered output stream to write data to the specified underlying output + * stream with the specified buffer size. + * + * @param out + * the underlying output stream. + * @param size + * the buffer size. + * @throws IllegalArgumentException + * if size <= 0. + */ + public TcpBufferedOutputStream(OutputStream out, int size) { + super(out); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buffer = new byte[size]; + bufferlen = size; + } + + /** + * write a byte on to the stream + * + * @param b + * - byte to write + * @throws IOException + */ + @Override + public void write(int b) throws IOException { + if ((bufferlen - count) < 1) { + flush(); + } + buffer[count++] = (byte) b; + } + + /** + * write a byte array to the stream + * + * @param b + * the byte buffer + * @param off + * the offset into the buffer + * @param len + * the length of data to write + * @throws IOException + */ + @Override + public void write(byte b[], int off, int len) throws IOException { + if (b != null) { + if ((bufferlen - count) < len) { + flush(); + } + if (buffer.length >= len) { + System.arraycopy(b, off, buffer, count, len); + count += len; + } else { + out.write(b, off, len); + } + } + } + + /** + * flush the data to the output stream This doesn't call flush on the underlying + * outputstream, because Tcp is particularly efficent at doing this itself .... + * + * @throws IOException + */ + @Override + public void flush() throws IOException { + if (count > 0 && out != null) { + out.write(buffer, 0, count); + count = 0; + } + } + + /** + * close this stream + * + * @throws IOException + */ + @Override + public void close() throws IOException { + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java new file mode 100644 index 0000000..fd312f8 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.vertx; + +import java.io.IOException; +import java.net.URI; + +import org.apache.qpid.jms.JmsSslContext; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.TransportListener; +import org.vertx.java.core.net.NetClient; + +/** + * Provides SSL configuration to the Vert.x NetClient object used by the underling + * TCP based Transport. + */ +public class SslTransport extends TcpTransport { + + // TODO - remove with SSL configuration placed in Transport options. + private JmsSslContext context; + + /** + * Create a new transport instance + * + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public SslTransport(URI remoteLocation, TransportOptions options) { + super(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public SslTransport(TransportListener listener, URI remoteLocation, TransportOptions options) { + super(listener, remoteLocation, options); + } + + @Override + protected void configureNetClient(NetClient client, TransportOptions options) throws IOException { + super.configureNetClient(client, options); + + client.setSSL(true); + client.setKeyStorePath(context.getKeyStoreLocation()); + client.setKeyStorePassword(context.getKeyStorePassword()); + client.setTrustStorePath(context.getTrustStoreLocation()); + client.setTrustStorePassword(context.getTrustStorePassword()); + } + + /** + * @return the context + */ + public JmsSslContext getContext() { + return context; + } + + /** + * @param context the context to set + */ + public void setContext(JmsSslContext context) { + this.context = context; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java new file mode 100644 index 0000000..b7c738c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.vertx; + +import java.net.URI; + +import org.apache.qpid.jms.JmsSslContext; +import org.apache.qpid.jms.transports.TransportOptions; + +/** + * Create an SslTransport instance. + */ +public class SslTransportFactory extends TcpTransportFactory { + + @Override + protected TcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception { + SslTransport transport = new SslTransport(remoteURI, transportOptions); + + transport.setContext(JmsSslContext.getCurrentSslContext()); + + return transport; + } + + @Override + public String getName() { + return "SSL"; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java new file mode 100644 index 0000000..e824ec4 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.vertx; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.Transport; +import org.apache.qpid.jms.transports.TransportListener; +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vertx.java.core.AsyncResult; +import org.vertx.java.core.AsyncResultHandler; +import org.vertx.java.core.Handler; +import org.vertx.java.core.Vertx; +import org.vertx.java.core.buffer.Buffer; +import org.vertx.java.core.impl.DefaultVertxFactory; +import org.vertx.java.core.net.NetClient; +import org.vertx.java.core.net.NetSocket; + +/** + * Vertex based TCP transport for raw data packets. + */ +public class TcpTransport implements Transport { + + private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); + + private final Vertx vertx; + private final NetClient client; + private final URI remoteLocation; + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>(); + + private NetSocket socket; + private TransportListener listener; + private TransportOptions options; + + /** + * Create a new transport instance + * + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public TcpTransport(URI remoteLocation, TransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public TcpTransport(TransportListener listener, URI remoteLocation, TransportOptions options) { + this.options = options; + this.listener = listener; + this.remoteLocation = remoteLocation; + + DefaultVertxFactory vertxFactory = new DefaultVertxFactory(); + this.vertx = vertxFactory.createVertx(); + this.client = vertx.createNetClient(); + } + + @Override + public void connect() throws IOException { + final CountDownLatch connectLatch = new CountDownLatch(1); + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + configureNetClient(client, getTransportOptions()); + + try { + client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler<NetSocket>() { + @Override + public void handle(AsyncResult<NetSocket> asyncResult) { + if (asyncResult.succeeded()) { + socket = asyncResult.result(); + LOG.info("We have connected! Socket is {}", socket); + + connected.set(true); + connectLatch.countDown(); + + socket.dataHandler(new Handler<Buffer>() { + @Override + public void handle(Buffer event) { + listener.onData(event.getByteBuf()); + } + }); + + socket.closeHandler(new Handler<Void>() { + @Override + public void handle(Void event) { + if (!closed.get()) { + connected.set(false); + listener.onTransportClosed(); + } + } + }); + + socket.exceptionHandler(new Handler<Throwable>() { + @Override + public void handle(Throwable event) { + if (!closed.get()) { + connected.set(false); + listener.onTransportError(event); + } + } + }); + + } else { + connected.set(false); + connectionError.set(asyncResult.cause()); + connectLatch.countDown(); + } + } + }); + } catch (Throwable reason) { + LOG.info("Failed to connect to target Broker: {}", reason); + throw IOExceptionSupport.create(reason); + } + + try { + connectLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + if (connectionError.get() != null) { + throw IOExceptionSupport.create(connectionError.get()); + } + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + if (connected.get()) { + socket.close(); + connected.set(false); + } + + vertx.stop(); + } + } + + @Override + public void send(ByteBuffer output) throws IOException { + checkConnected(); + int length = output.remaining(); + if (length == 0) { + return; + } + + byte[] copy = new byte[length]; + output.get(copy); + Buffer sendBuffer = new Buffer(copy); + + vertx.eventBus().send(socket.writeHandlerID(), sendBuffer); + } + + @Override + public void send(ByteBuf output) throws IOException { + checkConnected(); + int length = output.readableBytes(); + if (length == 0) { + return; + } + + Buffer sendBuffer = new Buffer(output.copy()); + vertx.eventBus().send(socket.writeHandlerID(), sendBuffer); + } + + /** + * Allows a subclass to configure the NetClient beyond what this transport might do. + * + * @throws IOException if an error occurs. + */ + protected void configureNetClient(NetClient client, TransportOptions options) throws IOException { + client.setSendBufferSize(options.getSendBufferSize()); + client.setReceiveBufferSize(options.getReceiveBufferSize()); + client.setSoLinger(options.getSoLinger()); + client.setTCPKeepAlive(options.isTcpKeepAlive()); + client.setTCPNoDelay(options.isTcpNoDelay()); + if (options.getConnectTimeout() >= 0) { + client.setConnectTimeout(options.getConnectTimeout()); + } + } + + @Override + public boolean isConnected() { + return this.connected.get(); + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + @Override + public TransportListener getTransportListener() { + return this.listener; + } + + @Override + public void setTransportListener(TransportListener listener) { + if (listener == null) { + throw new IllegalArgumentException("Listener cannot be set to null"); + } + + this.listener = listener; + } + + /** + * @return the options used to configure the TCP socket. + */ + public TransportOptions getTransportOptions() { + if (options == null) { + options = TransportOptions.DEFAULT_OPTIONS; + } + + return options; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java new file mode 100644 index 0000000..8385dff --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.vertx; + +import java.net.URI; +import java.util.Map; + +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.Transport; +import org.apache.qpid.jms.transports.TransportFactory; +import org.apache.qpid.jms.util.PropertyUtil; + +/** + * Factory for creating the Vert.x based TCP Transport + */ +public class TcpTransportFactory extends TransportFactory { + + @Override + public Transport createTransport(URI remoteURI) throws Exception { + + Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery()); + Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport."); + + remoteURI = PropertyUtil.replaceQuery(remoteURI, map); + + TransportOptions transportOptions = new TransportOptions(); + + if (!PropertyUtil.setProperties(transportOptions, transportURIOptions)) { + String msg = " Not all transport options could be set on the Transport." + + " Check the options are spelled correctly." + + " Given parameters=[" + transportURIOptions + "]." + + " This provider instance cannot be started."; + throw new IllegalArgumentException(msg); + } + + Transport result = doCreateTransport(remoteURI, transportOptions); + + return result; + } + + protected TcpTransport doCreateTransport(URI remoteURI, TransportOptions transportOptions) throws Exception { + return new TcpTransport(remoteURI, transportOptions); + } + + @Override + public String getName() { + return "TCP"; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl new file mode 100644 index 0000000..eed5a6b --- /dev/null +++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.qpid.jms.transports.vertx.SslTransportFactory http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp new file mode 100644 index 0000000..533bcd1 --- /dev/null +++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.qpid.jms.transports.vertx.TcpTransportFactory http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java index f4c5918..84197a4 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java @@ -29,9 +29,9 @@ import java.util.List; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.Wait; -import org.apache.qpid.jms.transports.NettyTcpTransport; -import org.apache.qpid.jms.transports.TcpTransportOptions; +import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.transports.TransportListener; +import org.apache.qpid.jms.transports.netty.NettyTcpTransport; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +48,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { private final List<ByteBuf> data = new ArrayList<ByteBuf>(); private final TransportListener testListener = new NettyTransportListener(); - private final TcpTransportOptions testOptions = new TcpTransportOptions(); + private final TransportOptions testOptions = new TransportOptions(); @Test(timeout = 60 * 1000) public void testConnectToServer() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org