Repository: qpid-jms
Updated Branches:
  refs/heads/master 24bd2a919 -> abde5ef20


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
new file mode 100644
index 0000000..bb996f5
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.jms.util.InetAddressUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class PlainTcpTransport implements Transport, Runnable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PlainTcpTransport.class);
+
+    private TransportListener listener;
+    private final URI remoteLocation;
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicReference<Throwable> connectionError = new 
AtomicReference<Throwable>();
+
+    private final Socket socket;
+    private DataOutputStream dataOut;
+    private DataInputStream dataIn;
+    private Thread runner;
+
+    private TransportOptions options;
+
+    private boolean closeAsync = true;
+    private boolean useLocalHost = false;
+    private int ioBufferSize = 8 * 1024;
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public PlainTcpTransport(URI remoteLocation, TransportOptions options) {
+        this(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this 
Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public PlainTcpTransport(TransportListener listener, URI remoteLocation, 
TransportOptions options) {
+        this.options = options;
+        this.listener = listener;
+        this.remoteLocation = remoteLocation;
+
+        Socket temp = null;
+        try {
+            temp = createSocketFactory().createSocket();
+        } catch (IOException e) {
+            connectionError.set(e);
+        }
+
+        this.socket = temp;
+    }
+
+    @Override
+    public void connect() throws IOException {
+        if (connectionError.get() != null) {
+            throw IOExceptionSupport.create(connectionError.get());
+        }
+
+        if (socket == null) {
+            throw new IllegalStateException("Cannot connect if the socket or 
socketFactory have not been set");
+        }
+
+        InetSocketAddress remoteAddress = null;
+
+        if (remoteLocation != null) {
+            String host = resolveHostName(remoteLocation.getHost());
+            remoteAddress = new InetSocketAddress(host, 
remoteLocation.getPort());
+        }
+
+        socket.connect(remoteAddress);
+
+        connected.set(true);
+
+        initialiseSocket(socket);
+        initializeStreams();
+
+        runner = new Thread(null, this, "QpidJMS " + 
getClass().getSimpleName() + ": " + toString());
+        runner.setDaemon(false);
+        runner.start();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            if (socket == null) {
+                return;
+            }
+
+            // Closing the streams flush the sockets before closing.. if the 
socket
+            // is hung.. then this hangs the close so we support an 
asynchronous close
+            // by default which will timeout if the close doesn't happen after 
a delay.
+            if (closeAsync) {
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                final ExecutorService closer = 
Executors.newSingleThreadExecutor();
+                closer.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        LOG.trace("Closing socket {}", socket);
+                        try {
+                            socket.close();
+                            LOG.debug("Closed socket {}", socket);
+                        } catch (IOException e) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Caught exception closing socket " + 
socket + ". This exception will be ignored.", e);
+                            }
+                        } finally {
+                            latch.countDown();
+                        }
+                    }
+                });
+
+                try {
+                    latch.await(1,TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    closer.shutdownNow();
+                }
+            } else {
+                LOG.trace("Closing socket {}", socket);
+                try {
+                    socket.close();
+                    LOG.debug("Closed socket {}", socket);
+                } catch (IOException e) {
+                    LOG.debug("Caught exception closing socket {}. This 
exception will be ignored.", socket, e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer output) throws IOException {
+        checkConnected();
+        LOG.info("RawTcpTransport sending packet of size: {}", 
output.remaining());
+        WritableByteChannel channel = Channels.newChannel(dataOut);
+        channel.write(output);
+        dataOut.flush();
+    }
+
+    @Override
+    public void send(ByteBuf output) throws IOException {
+        checkConnected();
+        send(output.nioBuffer());
+    }
+
+    @Override
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    @Override
+    public TransportListener getTransportListener() {
+        return this.listener;
+    }
+
+    @Override
+    public void setTransportListener(TransportListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("Listener cannot be set to 
null");
+        }
+
+        this.listener = listener;
+    }
+
+    public TransportOptions getTransportOptions() {
+        if (options == null) {
+            options = TransportOptions.DEFAULT_OPTIONS;
+        }
+
+        return options;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+
+    public int getIoBufferSize() {
+        return ioBufferSize;
+    }
+
+    public void setIoBufferSize(int ioBufferSize) {
+        this.ioBufferSize = ioBufferSize;
+    }
+
+    public boolean isCloseAsync() {
+        return closeAsync;
+    }
+
+    public void setCloseAsync(boolean closeAsync) {
+        this.closeAsync = closeAsync;
+    }
+
+    //---------- Transport internal implementation 
---------------------------//
+
+    @Override
+    public void run() {
+        LOG.trace("TCP consumer thread for " + this + " starting");
+        try {
+            while (isConnected()) {
+                doRun();
+            }
+        } catch (IOException e) {
+            connectionError.set(e);
+            onException(e);
+        } catch (Throwable e) {
+            IOException ioe = new IOException("Unexpected error occured: " + 
e);
+            connectionError.set(ioe);
+            ioe.initCause(e);
+            onException(ioe);
+        }
+    }
+
+    protected void doRun() throws IOException {
+        int size = dataIn.available();
+        if (size <= 0) {
+            try {
+                TimeUnit.NANOSECONDS.sleep(1);
+            } catch (InterruptedException e) {
+            }
+            return;
+        }
+
+        byte[] buffer = new byte[size];
+        dataIn.readFully(buffer);
+        listener.onData(Unpooled.wrappedBuffer(buffer));
+    }
+
+    /**
+     * Passes any IO exceptions into the transport listener
+     */
+    public void onException(IOException e) {
+        if (listener != null) {
+            try {
+                listener.onTransportError(e);
+            } catch (RuntimeException e2) {
+                LOG.debug("Unexpected runtime exception: " + e2, e2);
+            }
+        }
+    }
+
+    protected SocketFactory createSocketFactory() throws IOException {
+        return SocketFactory.getDefault();
+    }
+
+    protected void initialiseSocket(Socket sock) throws SocketException, 
IllegalArgumentException {
+        TransportOptions options = getTransportOptions();
+
+        try {
+            sock.setReceiveBufferSize(options.getReceiveBufferSize());
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket receive buffer size = {}", 
options.getReceiveBufferSize());
+            LOG.debug("Cannot set socket receive buffer size. Reason: {}. This 
exception is ignored.", se.getMessage(), se);
+        }
+
+        try {
+            sock.setSendBufferSize(options.getSendBufferSize());
+        } catch (SocketException se) {
+            LOG.warn("Cannot set socket send buffer size = {}", 
options.getSendBufferSize());
+            LOG.debug("Cannot set socket send buffer size. Reason: {}. This 
exception is ignored.", se.getMessage(), se);
+        }
+
+        sock.setSoTimeout(options.getSoTimeout());
+        sock.setKeepAlive(options.isTcpKeepAlive());
+        sock.setTcpNoDelay(options.isTcpNoDelay());
+
+        if (options.getSoLinger() > 0) {
+            sock.setSoLinger(true, options.getSoLinger());
+        } else {
+            sock.setSoLinger(false, 0);
+        }
+    }
+
+    protected void initializeStreams() throws IOException {
+        try {
+            TcpBufferedInputStream buffIn = new 
TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+            this.dataIn = new DataInputStream(buffIn);
+            TcpBufferedOutputStream outputStream = new 
TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+            this.dataOut = new DataOutputStream(outputStream);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    protected String resolveHostName(String host) throws UnknownHostException {
+        if (isUseLocalHost()) {
+            String localName = InetAddressUtil.getLocalHostName();
+            if (localName != null && localName.equals(host)) {
+                return "localhost";
+            }
+        }
+        return host;
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
new file mode 100644
index 0000000..3894408
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransportFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the Plain TCP transport.
+ */
+public class PlainTcpTransportFactory extends TransportFactory {
+
+    @Override
+    public Transport createTransport(URI remoteURI) throws Exception {
+
+        Map<String, String> map = 
PropertyUtil.parseQuery(remoteURI.getQuery());
+        Map<String, String> transportURIOptions = 
PropertyUtil.filterProperties(map, "transport.");
+
+        remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+        TransportOptions transportOptions = new TransportOptions();
+
+        if (!PropertyUtil.setProperties(transportOptions, 
transportURIOptions)) {
+            String msg = " Not all transport options could be set on the 
Transport." +
+                         " Check the options are spelled correctly." +
+                         " Given parameters=[" + transportURIOptions + "]." +
+                         " This provider instance cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        Transport result = doCreateTransport(remoteURI, transportOptions);
+
+        return result;
+    }
+
+    protected PlainTcpTransport doCreateTransport(URI remoteURI, 
TransportOptions transportOptions) throws Exception {
+        return new PlainTcpTransport(remoteURI, transportOptions);
+    }
+
+    @Override
+    public String getName() {
+        return "TCP";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
new file mode 100644
index 0000000..ee4bf5c
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedInputStream.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An optimized buffered input stream for Tcp
+ */
+public class TcpBufferedInputStream extends FilterInputStream {
+
+    private static final int DEFAULT_BUFFER_SIZE = 8192;
+    protected byte internalBuffer[];
+    protected int count;
+    protected int position;
+
+    public TcpBufferedInputStream(InputStream in) {
+        this(in, DEFAULT_BUFFER_SIZE);
+    }
+
+    public TcpBufferedInputStream(InputStream in, int size) {
+        super(in);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        internalBuffer = new byte[size];
+    }
+
+    protected void fill() throws IOException {
+        byte[] buffer = internalBuffer;
+        count = 0;
+        position = 0;
+        int n = in.read(buffer, position, buffer.length - position);
+        if (n > 0) {
+            count = n + position;
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (position >= count) {
+            fill();
+            if (position >= count) {
+                return -1;
+            }
+        }
+        return internalBuffer[position++] & 0xff;
+    }
+
+    private int readStream(byte[] b, int off, int len) throws IOException {
+        int avail = count - position;
+        if (avail <= 0) {
+            if (len >= internalBuffer.length) {
+                return in.read(b, off, len);
+            }
+            fill();
+            avail = count - position;
+            if (avail <= 0) {
+                return -1;
+            }
+        }
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(internalBuffer, position, b, off, cnt);
+        position += cnt;
+        return cnt;
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+        int n = 0;
+        for (;;) {
+            int nread = readStream(b, off + n, len - n);
+            if (nread <= 0) {
+                return (n == 0) ? nread : n;
+            }
+            n += nread;
+            if (n >= len) {
+                return n;
+            }
+            // if not closed but no bytes available, return
+            InputStream input = in;
+            if (input != null && input.available() <= 0) {
+                return n;
+            }
+        }
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (n <= 0) {
+            return 0;
+        }
+        long avail = count - position;
+        if (avail <= 0) {
+            return in.skip(n);
+        }
+        long skipped = (avail < n) ? avail : n;
+        position += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available() + (count - position);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (in != null) {
+            in.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
new file mode 100644
index 0000000..84359ae
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/TcpBufferedOutputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.plain;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An optimized buffered outputstream for Tcp
+ */
+public class TcpBufferedOutputStream extends FilterOutputStream {
+
+    private static final int BUFFER_SIZE = 8192;
+    private final byte[] buffer;
+    private final int bufferlen;
+    private int count;
+
+    /**
+     * Constructor
+     *
+     * @param out
+     */
+    public TcpBufferedOutputStream(OutputStream out) {
+        this(out, BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified 
underlying output
+     * stream with the specified buffer size.
+     *
+     * @param out
+     *        the underlying output stream.
+     * @param size
+     *        the buffer size.
+     * @throws IllegalArgumentException
+     *         if size <= 0.
+     */
+    public TcpBufferedOutputStream(OutputStream out, int size) {
+        super(out);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buffer = new byte[size];
+        bufferlen = size;
+    }
+
+    /**
+     * write a byte on to the stream
+     *
+     * @param b
+     *        - byte to write
+     * @throws IOException
+     */
+    @Override
+    public void write(int b) throws IOException {
+        if ((bufferlen - count) < 1) {
+            flush();
+        }
+        buffer[count++] = (byte) b;
+    }
+
+    /**
+     * write a byte array to the stream
+     *
+     * @param b
+     *        the byte buffer
+     * @param off
+     *        the offset into the buffer
+     * @param len
+     *        the length of data to write
+     * @throws IOException
+     */
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        if (b != null) {
+            if ((bufferlen - count) < len) {
+                flush();
+            }
+            if (buffer.length >= len) {
+                System.arraycopy(b, off, buffer, count, len);
+                count += len;
+            } else {
+                out.write(b, off, len);
+            }
+        }
+    }
+
+    /**
+     * flush the data to the output stream This doesn't call flush on the 
underlying
+     * outputstream, because Tcp is particularly efficent at doing this itself 
....
+     *
+     * @throws IOException
+     */
+    @Override
+    public void flush() throws IOException {
+        if (count > 0 && out != null) {
+            out.write(buffer, 0, count);
+            count = 0;
+        }
+    }
+
+    /**
+     * close this stream
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
new file mode 100644
index 0000000..fd312f8
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransport.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.vertx.java.core.net.NetClient;
+
+/**
+ * Provides SSL configuration to the Vert.x NetClient object used by the 
underling
+ * TCP based Transport.
+ */
+public class SslTransport extends TcpTransport {
+
+    // TODO - remove with SSL configuration placed in Transport options.
+    private JmsSslContext context;
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public SslTransport(URI remoteLocation, TransportOptions options) {
+        super(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this 
Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public SslTransport(TransportListener listener, URI remoteLocation, 
TransportOptions options) {
+        super(listener, remoteLocation, options);
+    }
+
+    @Override
+    protected void configureNetClient(NetClient client, TransportOptions 
options) throws IOException {
+        super.configureNetClient(client, options);
+
+        client.setSSL(true);
+        client.setKeyStorePath(context.getKeyStoreLocation());
+        client.setKeyStorePassword(context.getKeyStorePassword());
+        client.setTrustStorePath(context.getTrustStoreLocation());
+        client.setTrustStorePassword(context.getTrustStorePassword());
+    }
+
+    /**
+     * @return the context
+     */
+    public JmsSslContext getContext() {
+        return context;
+    }
+
+    /**
+     * @param context the context to set
+     */
+    public void setContext(JmsSslContext context) {
+        this.context = context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
new file mode 100644
index 0000000..b7c738c
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/SslTransportFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.JmsSslContext;
+import org.apache.qpid.jms.transports.TransportOptions;
+
+/**
+ * Create an SslTransport instance.
+ */
+public class SslTransportFactory extends TcpTransportFactory {
+
+    @Override
+    protected TcpTransport doCreateTransport(URI remoteURI, TransportOptions 
transportOptions) throws Exception {
+        SslTransport transport = new SslTransport(remoteURI, transportOptions);
+
+        transport.setContext(JmsSslContext.getCurrentSslContext());
+
+        return transport;
+    }
+
+    @Override
+    public String getName() {
+        return "SSL";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
new file mode 100644
index 0000000..e824ec4
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.AsyncResult;
+import org.vertx.java.core.AsyncResultHandler;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.Vertx;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.impl.DefaultVertxFactory;
+import org.vertx.java.core.net.NetClient;
+import org.vertx.java.core.net.NetSocket;
+
+/**
+ * Vertex based TCP transport for raw data packets.
+ */
+public class TcpTransport implements Transport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TcpTransport.class);
+
+    private final Vertx vertx;
+    private final NetClient client;
+    private final URI remoteLocation;
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicReference<Throwable> connectionError = new 
AtomicReference<Throwable>();
+
+    private NetSocket socket;
+    private TransportListener listener;
+    private TransportOptions options;
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public TcpTransport(URI remoteLocation, TransportOptions options) {
+        this(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this 
Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public TcpTransport(TransportListener listener, URI remoteLocation, 
TransportOptions options) {
+        this.options = options;
+        this.listener = listener;
+        this.remoteLocation = remoteLocation;
+
+        DefaultVertxFactory vertxFactory = new DefaultVertxFactory();
+        this.vertx = vertxFactory.createVertx();
+        this.client = vertx.createNetClient();
+    }
+
+    @Override
+    public void connect() throws IOException {
+        final CountDownLatch connectLatch = new CountDownLatch(1);
+
+        if (listener == null) {
+            throw new IllegalStateException("A transport listener must be set 
before connection attempts.");
+        }
+
+        configureNetClient(client, getTransportOptions());
+
+        try {
+            client.connect(remoteLocation.getPort(), remoteLocation.getHost(), 
new AsyncResultHandler<NetSocket>() {
+                @Override
+                public void handle(AsyncResult<NetSocket> asyncResult) {
+                    if (asyncResult.succeeded()) {
+                        socket = asyncResult.result();
+                        LOG.info("We have connected! Socket is {}", socket);
+
+                        connected.set(true);
+                        connectLatch.countDown();
+
+                        socket.dataHandler(new Handler<Buffer>() {
+                            @Override
+                            public void handle(Buffer event) {
+                                listener.onData(event.getByteBuf());
+                            }
+                        });
+
+                        socket.closeHandler(new Handler<Void>() {
+                            @Override
+                            public void handle(Void event) {
+                                if (!closed.get()) {
+                                    connected.set(false);
+                                    listener.onTransportClosed();
+                                }
+                            }
+                        });
+
+                        socket.exceptionHandler(new Handler<Throwable>() {
+                            @Override
+                            public void handle(Throwable event) {
+                                if (!closed.get()) {
+                                    connected.set(false);
+                                    listener.onTransportError(event);
+                                }
+                            }
+                        });
+
+                    } else {
+                        connected.set(false);
+                        connectionError.set(asyncResult.cause());
+                        connectLatch.countDown();
+                    }
+                }
+            });
+        } catch (Throwable reason) {
+            LOG.info("Failed to connect to target Broker: {}", reason);
+            throw IOExceptionSupport.create(reason);
+        }
+
+        try {
+            connectLatch.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        if (connectionError.get() != null) {
+            throw IOExceptionSupport.create(connectionError.get());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            if (connected.get()) {
+                socket.close();
+                connected.set(false);
+            }
+
+            vertx.stop();
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer output) throws IOException {
+        checkConnected();
+        int length = output.remaining();
+        if (length == 0) {
+            return;
+        }
+
+        byte[] copy = new byte[length];
+        output.get(copy);
+        Buffer sendBuffer = new Buffer(copy);
+
+        vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+    }
+
+    @Override
+    public void send(ByteBuf output) throws IOException {
+        checkConnected();
+        int length = output.readableBytes();
+        if (length == 0) {
+            return;
+        }
+
+        Buffer sendBuffer = new Buffer(output.copy());
+        vertx.eventBus().send(socket.writeHandlerID(), sendBuffer);
+    }
+
+    /**
+     * Allows a subclass to configure the NetClient beyond what this transport 
might do.
+     *
+     * @throws IOException if an error occurs.
+     */
+    protected void configureNetClient(NetClient client, TransportOptions 
options) throws IOException {
+        client.setSendBufferSize(options.getSendBufferSize());
+        client.setReceiveBufferSize(options.getReceiveBufferSize());
+        client.setSoLinger(options.getSoLinger());
+        client.setTCPKeepAlive(options.isTcpKeepAlive());
+        client.setTCPNoDelay(options.isTcpNoDelay());
+        if (options.getConnectTimeout() >= 0) {
+            client.setConnectTimeout(options.getConnectTimeout());
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return this.connected.get();
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+
+    @Override
+    public TransportListener getTransportListener() {
+        return this.listener;
+    }
+
+    @Override
+    public void setTransportListener(TransportListener listener) {
+        if (listener == null) {
+            throw new IllegalArgumentException("Listener cannot be set to 
null");
+        }
+
+        this.listener = listener;
+    }
+
+    /**
+     * @return the options used to configure the TCP socket.
+     */
+    public TransportOptions getTransportOptions() {
+        if (options == null) {
+            options = TransportOptions.DEFAULT_OPTIONS;
+        }
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
new file mode 100644
index 0000000..8385dff
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransportFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.vertx;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+
+/**
+ * Factory for creating the Vert.x based TCP Transport
+ */
+public class TcpTransportFactory extends TransportFactory {
+
+    @Override
+    public Transport createTransport(URI remoteURI) throws Exception {
+
+        Map<String, String> map = 
PropertyUtil.parseQuery(remoteURI.getQuery());
+        Map<String, String> transportURIOptions = 
PropertyUtil.filterProperties(map, "transport.");
+
+        remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+        TransportOptions transportOptions = new TransportOptions();
+
+        if (!PropertyUtil.setProperties(transportOptions, 
transportURIOptions)) {
+            String msg = " Not all transport options could be set on the 
Transport." +
+                         " Check the options are spelled correctly." +
+                         " Given parameters=[" + transportURIOptions + "]." +
+                         " This provider instance cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        Transport result = doCreateTransport(remoteURI, transportOptions);
+
+        return result;
+    }
+
+    protected TcpTransport doCreateTransport(URI remoteURI, TransportOptions 
transportOptions) throws Exception {
+        return new TcpTransport(remoteURI, transportOptions);
+    }
+
+    @Override
+    public String getName() {
+        return "TCP";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
 
b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
new file mode 100644
index 0000000..eed5a6b
--- /dev/null
+++ 
b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.transports.vertx.SslTransportFactory

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
 
b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
new file mode 100644
index 0000000..533bcd1
--- /dev/null
+++ 
b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/transports/tcp
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.transports.vertx.TcpTransportFactory

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/abde5ef2/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
index f4c5918..84197a4 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
@@ -29,9 +29,9 @@ import java.util.List;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
-import org.apache.qpid.jms.transports.NettyTcpTransport;
-import org.apache.qpid.jms.transports.TcpTransportOptions;
+import org.apache.qpid.jms.transports.TransportOptions;
 import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
     private final List<ByteBuf> data = new ArrayList<ByteBuf>();
 
     private final TransportListener testListener = new 
NettyTransportListener();
-    private final TcpTransportOptions testOptions = new TcpTransportOptions();
+    private final TransportOptions testOptions = new TransportOptions();
 
     @Test(timeout = 60 * 1000)
     public void testConnectToServer() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to