https://issues.apache.org/jira/browse/AMQ-5889
-Cleaned up missing license headers and refactored packages. -Added configuration options for a protocol detection timeout and for the max number of connections accepted at the same time. -Fixed a regression with connection counts -Also added some more tests Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e14aca87 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e14aca87 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e14aca87 Branch: refs/heads/master Commit: e14aca871c779d7f017d3f068719dc53b0351d18 Parents: e5a94bf Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Wed Aug 12 12:55:00 2015 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Thu Aug 13 14:41:29 2015 +0000 ---------------------------------------------------------------------- .../amqp/auto/JMSClientAutoNioPlusSslTest.java | 1 - .../transport/auto/AutoSslTransportFactory.java | 117 ------ .../transport/auto/AutoSslTransportServer.java | 149 -------- .../transport/auto/AutoTcpTransportFactory.java | 110 ------ .../transport/auto/AutoTcpTransportServer.java | 298 --------------- .../transport/auto/AutoTransportUtils.java | 62 ---- .../auto/nio/AutoNIOSSLTransportServer.java | 122 ------ .../transport/auto/nio/AutoNIOTransport.java | 85 ----- .../auto/nio/AutoNioSslTransportFactory.java | 131 ------- .../auto/nio/AutoNioTransportFactory.java | 114 ------ .../protocol/AmqpProtocolVerifier.java | 36 -- .../protocol/MqttProtocolVerifier.java | 45 --- .../protocol/OpenWireProtocolVerifier.java | 67 ---- .../transport/protocol/ProtocolVerifier.java | 9 - .../protocol/StompProtocolVerifier.java | 39 -- .../transport/auto/AutoSslTransportFactory.java | 117 ++++++ .../transport/auto/AutoSslTransportServer.java | 149 ++++++++ .../transport/auto/AutoTcpTransportFactory.java | 102 +++++ .../transport/auto/AutoTcpTransportServer.java | 370 +++++++++++++++++++ .../transport/auto/AutoTransportUtils.java | 62 ++++ .../auto/nio/AutoNIOSSLTransportServer.java | 159 ++++++++ .../transport/auto/nio/AutoNIOTransport.java | 85 +++++ .../auto/nio/AutoNioSslTransportFactory.java | 131 +++++++ .../auto/nio/AutoNioTransportFactory.java | 114 ++++++ .../transport/nio/AutoInitNioSSLTransport.java | 27 +- .../protocol/AmqpProtocolVerifier.java | 37 ++ .../protocol/MqttProtocolVerifier.java | 45 +++ .../protocol/OpenWireProtocolVerifier.java | 67 ++++ .../transport/protocol/ProtocolVerifier.java | 24 ++ .../protocol/StompProtocolVerifier.java | 39 ++ .../services/org/apache/activemq/transport/auto | 2 +- .../org/apache/activemq/transport/auto+nio | 2 +- .../org/apache/activemq/transport/auto+nio+ssl | 2 +- .../org/apache/activemq/transport/auto+ssl | 2 +- .../activemq/transport/nio/NIOSSLTransport.java | 46 +-- .../transport/tcp/TcpTransportServer.java | 79 ++-- .../stomp/auto/AutoStompConnectTimeoutTest.java | 180 +++++++++ .../auto/AutoNIOSslTransportBrokerTest.java | 1 - .../auto/AutoTransportMaxConnectionsTest.java | 148 ++++++++ 39 files changed, 1916 insertions(+), 1459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java index 0bf2e38..127c6d6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.transport.amqp.auto; import java.net.URI; -import org.apache.activemq.transport.amqp.JMSClientNioPlusSslTest; import org.apache.activemq.transport.amqp.JMSClientSslTest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java deleted file mode 100644 index 7c65311..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.auto; - -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import javax.net.ServerSocketFactory; -import javax.net.ssl.SSLServerSocketFactory; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.tcp.SslTransportFactory; -import org.apache.activemq.transport.tcp.SslTransportServer; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.URISupport; -import org.apache.activemq.wireformat.WireFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class AutoSslTransportFactory extends SslTransportFactory implements BrokerServiceAware { - private static final Logger LOG = LoggerFactory.getLogger(AutoSslTransportFactory.class); - - - protected BrokerService brokerService; - /* (non-Javadoc) - * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) - */ - @Override - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - } - - private Set<String> enabledProtocols; - - /** - * Overriding to use SslTransportServer and allow for proper reflection. - */ - @Override - public TransportServer doBind(final URI location) throws IOException { - try { - Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); - - Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); - this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); - - ServerSocketFactory serverSocketFactory = createServerSocketFactory(); - AutoSslTransportServer server = createAutoSslTransportServer(location, (SSLServerSocketFactory)serverSocketFactory); - if (options.get("allowLinkStealing") != null){ - allowLinkStealingSet = true; - } - IntrospectionSupport.setProperties(server, options); - server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, "auto.")); - server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); - server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); - server.bind(); - - return server; - } catch (URISyntaxException e) { - throw IOExceptionSupport.create(e); - } - } - - boolean allowLinkStealingSet = false; - - /** - * Allows subclasses of SslTransportFactory to create custom instances of - * SslTransportServer. - * - * @param location - * @param serverSocketFactory - * @return - * @throws IOException - * @throws URISyntaxException - */ - // @Override - protected AutoSslTransportServer createAutoSslTransportServer(final URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - AutoSslTransportServer server = new AutoSslTransportServer(this, location, serverSocketFactory, - this.brokerService, enabledProtocols) { - @Override - protected TcpTransport createTransport(Socket socket, WireFormat format) - throws IOException { - if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) { - this.setAllowLinkStealing(true); - } - return super.createTransport(socket, format); - } - }; - return server; - } - - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java deleted file mode 100644 index 9954523..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.broker.transport.auto; - -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Set; - -import javax.net.ServerSocketFactory; -import javax.net.ssl.SSLServerSocket; -import javax.net.ssl.SSLServerSocketFactory; -import javax.net.ssl.SSLSocket; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.tcp.SslTransport; -import org.apache.activemq.transport.tcp.SslTransportFactory; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.transport.tcp.TcpTransportFactory; -import org.apache.activemq.wireformat.WireFormat; - -/** - * An SSL TransportServer. - * - * Allows for client certificate authentication (refer to setNeedClientAuth for - * details). - * NOTE: Client certificate authentication is disabled by default. - * - */ -public class AutoSslTransportServer extends AutoTcpTransportServer { - - - - // Specifies if sockets created from this server should needClientAuth. - private boolean needClientAuth; - - // Specifies if sockets created from this server should wantClientAuth. - private boolean wantClientAuth; - -// /** -// * Creates a ssl transport server for the specified url using the provided -// * serverSocketFactory -// * -// * @param transportFactory The factory used to create transports when connections arrive. -// * @param location The location of the broker to bind to. -// * @param serverSocketFactory The factory used to create this server. -// * @throws IOException passed up from TcpTransportFactory. -// * @throws URISyntaxException passed up from TcpTransportFactory. -// */ -// public SslTransportServer(SslTransportFactory transportFactory, URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { -// super(transportFactory, location, serverSocketFactory); -// } - - public AutoSslTransportServer(SslTransportFactory transportFactory, - URI location, SSLServerSocketFactory serverSocketFactory, - BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException { - super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols); - // TODO Auto-generated constructor stub - } - - /** - * Sets whether client authentication should be required - * Must be called before {@link #bind()} - * Note: Calling this method clears the wantClientAuth flag - * in the underlying implementation. - */ - public void setNeedClientAuth(boolean needAuth) { - this.needClientAuth = needAuth; - } - - /** - * Returns whether client authentication should be required. - */ - public boolean getNeedClientAuth() { - return this.needClientAuth; - } - - /** - * Returns whether client authentication should be requested. - */ - public boolean getWantClientAuth() { - return this.wantClientAuth; - } - - /** - * Sets whether client authentication should be requested. - * Must be called before {@link #bind()} - * Note: Calling this method clears the needClientAuth flag - * in the underlying implementation. - */ - public void setWantClientAuth(boolean wantAuth) { - this.wantClientAuth = wantAuth; - } - - /** - * Binds this socket to the previously specified URI. - * - * Overridden to allow for proper handling of needClientAuth. - * - * @throws IOException passed up from TcpTransportServer. - */ - @Override - public void bind() throws IOException { - super.bind(); - if (needClientAuth) { - ((SSLServerSocket)this.serverSocket).setNeedClientAuth(true); - } else if (wantClientAuth) { - ((SSLServerSocket)this.serverSocket).setWantClientAuth(true); - } - } - - /** - * Used to create Transports for this server. - * - * Overridden to allow the use of SslTransports (instead of TcpTransports). - * - * @param socket The incoming socket that will be wrapped into the new Transport. - * @param format The WireFormat being used. - * @return The newly return (SSL) Transport. - * @throws IOException - */ - @Override - protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { - return new SslTransport(format, (SSLSocket)socket, this.initBuffer); - } - - @Override - public boolean isSslServer() { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java deleted file mode 100644 index 5731b85..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.auto; - -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.net.ServerSocketFactory; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.openwire.OpenWireFormatFactory; -import org.apache.activemq.transport.MutexTransport; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.transport.TransportFilter; -import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.transport.tcp.TcpTransportFactory; -import org.apache.activemq.util.FactoryFinder; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.URISupport; -import org.apache.activemq.wireformat.WireFormat; -import org.apache.activemq.wireformat.WireFormatFactory; - -/** - * - * - */ -public class AutoTcpTransportFactory extends TcpTransportFactory implements BrokerServiceAware { - - protected BrokerService brokerService; - /* (non-Javadoc) - * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) - */ - @Override - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - } - - - @Override - public TransportServer doBind(final URI location) throws IOException { - try { - Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); - - Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); - this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); - - ServerSocketFactory serverSocketFactory = createServerSocketFactory(); - AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); - //server.setWireFormatFactory(createWireFormatFactory(options)); - server.setWireFormatFactory(new OpenWireFormatFactory()); - if (options.get("allowLinkStealing") != null){ - allowLinkStealingSet = true; - } - IntrospectionSupport.setProperties(server, options); - server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); - server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); - server.bind(); - - return server; - } catch (URISyntaxException e) { - throw IOExceptionSupport.create(e); - } - } - - boolean allowLinkStealingSet = false; - private Set<String> enabledProtocols; - - @Override - protected AutoTcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - AutoTcpTransportServer server = new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { - - @Override - protected TcpTransport createTransport(Socket socket, WireFormat format) - throws IOException { - if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) { - this.setAllowLinkStealing(true); - } - return super.createTransport(socket, format); - } - - }; - - return server; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java deleted file mode 100644 index 65a0fe5..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.auto; - -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import javax.net.ServerSocketFactory; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.transport.protocol.AmqpProtocolVerifier; -import org.apache.activemq.broker.transport.protocol.MqttProtocolVerifier; -import org.apache.activemq.broker.transport.protocol.OpenWireProtocolVerifier; -import org.apache.activemq.broker.transport.protocol.ProtocolVerifier; -import org.apache.activemq.broker.transport.protocol.StompProtocolVerifier; -import org.apache.activemq.openwire.OpenWireFormatFactory; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportFactory; -import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; -import org.apache.activemq.transport.tcp.TcpTransportFactory; -import org.apache.activemq.transport.tcp.TcpTransportServer; -import org.apache.activemq.util.FactoryFinder; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.wireformat.WireFormat; -import org.apache.activemq.wireformat.WireFormatFactory; -import org.fusesource.hawtbuf.Buffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A TCP based implementation of {@link TransportServer} - */ -public class AutoTcpTransportServer extends TcpTransportServer { - - private static final Logger LOG = LoggerFactory.getLogger(AutoTcpTransportServer.class); - - protected Map<String, Map<String, Object>> wireFormatOptions; - protected Map<String, Object> autoTransportOptions; - protected Set<String> enabledProtocols; - protected final Map<String, ProtocolVerifier> protocolVerifiers = new ConcurrentHashMap<String, ProtocolVerifier>(); - - protected BrokerService brokerService; - - private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/"); - private final ConcurrentMap<String, TransportFactory> transportFactories = new ConcurrentHashMap<String, TransportFactory>(); - - private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); - - public WireFormatFactory findWireFormatFactory(String scheme, Map<String, Map<String, Object>> options) throws IOException { - WireFormatFactory wff = null; - try { - wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(scheme); - if (options != null) { - IntrospectionSupport.setProperties(wff, options.get(AutoTransportUtils.ALL)); - IntrospectionSupport.setProperties(wff, options.get(scheme)); - } - if (wff instanceof OpenWireFormatFactory) { - protocolVerifiers.put(AutoTransportUtils.OPENWIRE, new OpenWireProtocolVerifier((OpenWireFormatFactory) wff)); - } - return wff; - } catch (Throwable e) { - throw IOExceptionSupport.create("Could not create wire format factory for: " + scheme + ", reason: " + e, e); - } - } - - public TransportFactory findTransportFactory(String scheme, Map<String, ?> options) throws IOException { - scheme = append(scheme, "nio"); - scheme = append(scheme, "ssl"); - - if (scheme.isEmpty()) { - scheme = "tcp"; - } - - TransportFactory tf = transportFactories.get(scheme); - if (tf == null) { - // Try to load if from a META-INF property. - try { - tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); - if (options != null) - IntrospectionSupport.setProperties(tf, options); - transportFactories.put(scheme, tf); - } catch (Throwable e) { - throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); - } - } - return tf; - } - - protected String append(String currentScheme, String scheme) { - if (this.getBindLocation().getScheme().contains(scheme)) { - if (!currentScheme.isEmpty()) { - currentScheme += "+"; - } - currentScheme += scheme; - } - return currentScheme; - } - - /** - * @param transportFactory - * @param location - * @param serverSocketFactory - * @throws IOException - * @throws URISyntaxException - */ - public AutoTcpTransportServer(TcpTransportFactory transportFactory, - URI location, ServerSocketFactory serverSocketFactory, BrokerService brokerService, - Set<String> enabledProtocols) - throws IOException, URISyntaxException { - super(transportFactory, location, serverSocketFactory); - service = Executors.newCachedThreadPool(); - this.brokerService = brokerService; - this.enabledProtocols = enabledProtocols; - initProtocolVerifiers(); - } - - @Override - public void setWireFormatFactory(WireFormatFactory factory) { - super.setWireFormatFactory(factory); - initOpenWireProtocolVerifier(); - } - - protected void initProtocolVerifiers() { - initOpenWireProtocolVerifier(); - - if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.AMQP)) { - protocolVerifiers.put(AutoTransportUtils.AMQP, new AmqpProtocolVerifier()); - } - if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.STOMP)) { - protocolVerifiers.put(AutoTransportUtils.STOMP, new StompProtocolVerifier()); - } - if (isAllProtocols()|| enabledProtocols.contains(AutoTransportUtils.MQTT)) { - protocolVerifiers.put(AutoTransportUtils.MQTT, new MqttProtocolVerifier()); - } - } - - protected void initOpenWireProtocolVerifier() { - if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.OPENWIRE)) { - OpenWireProtocolVerifier owpv; - if (wireFormatFactory instanceof OpenWireFormatFactory) { - owpv = new OpenWireProtocolVerifier((OpenWireFormatFactory) wireFormatFactory); - } else { - owpv = new OpenWireProtocolVerifier(new OpenWireFormatFactory()); - } - protocolVerifiers.put(AutoTransportUtils.OPENWIRE, owpv); - } - } - - protected boolean isAllProtocols() { - return enabledProtocols == null || enabledProtocols.isEmpty(); - } - - - protected final ExecutorService service; - - - /** - * This holds the initial buffer that has been read to detect the protocol. - */ - public InitBuffer initBuffer; - - @Override - protected void handleSocket(final Socket socket) { - final AutoTcpTransportServer server = this; - - //This needs to be done in a new thread because - //the socket might be waiting on the client to send bytes - //doHandleSocket can't complete until the protocol can be detected - service.submit(new Runnable() { - @Override - public void run() { - server.doHandleSocket(socket); - } - }); - } - - @Override - protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { - InputStream is = socket.getInputStream(); - - //We need to peak at the first 8 bytes of the buffer to detect the protocol - Buffer magic = new Buffer(8); - magic.readFrom(is); - - ProtocolInfo protocolInfo = detectProtocol(magic.getData()); - - initBuffer = new InitBuffer(8, ByteBuffer.allocate(8)); - initBuffer.buffer.put(magic.getData()); - - if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { - ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); - } - - WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); - Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory); - - return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); - } - - @Override - protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { - return new TcpTransport(format, socket, this.initBuffer); - } - - /** - * @param socket - * @param format - * @param detectedTransportFactory - * @return - */ - protected TcpTransport createTransport(Socket socket, WireFormat format, - TcpTransportFactory detectedTransportFactory) throws IOException { - return createTransport(socket, format); - } - - public void setWireFormatOptions(Map<String, Map<String, Object>> wireFormatOptions) { - this.wireFormatOptions = wireFormatOptions; - } - - public void setEnabledProtocols(Set<String> enabledProtocols) { - this.enabledProtocols = enabledProtocols; - } - - public void setAutoTransportOptions(Map<String, Object> autoTransportOptions) { - this.autoTransportOptions = autoTransportOptions; - if (autoTransportOptions.get("protocols") != null) - this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoTransportOptions.get("protocols")); - } - - protected ProtocolInfo detectProtocol(byte[] buffer) throws IOException { - TcpTransportFactory detectedTransportFactory = transportFactory; - WireFormatFactory detectedWireFormatFactory = wireFormatFactory; - - boolean found = false; - for (String scheme : protocolVerifiers.keySet()) { - if (protocolVerifiers.get(scheme).isProtocol(buffer)) { - LOG.debug("Detected " + scheme); - detectedWireFormatFactory = findWireFormatFactory(scheme, wireFormatOptions); - - if (scheme.equals("default")) { - scheme = ""; - } - - detectedTransportFactory = (TcpTransportFactory) findTransportFactory(scheme, transportOptions); - found = true; - break; - } - } - - if (!found) { - throw new IllegalStateException("Could not detect wire format"); - } - - return new ProtocolInfo(detectedTransportFactory, detectedWireFormatFactory); - - } - - protected class ProtocolInfo { - public final TcpTransportFactory detectedTransportFactory; - public final WireFormatFactory detectedWireFormatFactory; - - public ProtocolInfo(TcpTransportFactory detectedTransportFactory, - WireFormatFactory detectedWireFormatFactory) { - super(); - this.detectedTransportFactory = detectedTransportFactory; - this.detectedWireFormatFactory = detectedWireFormatFactory; - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java deleted file mode 100644 index 453292d..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.auto; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.activemq.util.IntrospectionSupport; - -/** - * - * - */ -public class AutoTransportUtils { - - //wireformats - public static String ALL = "all"; - public static String OPENWIRE = "default"; - public static String STOMP = "stomp"; - public static String AMQP = "amqp"; - public static String MQTT = "mqtt"; - - //transports - public static String AUTO = "auto"; - - public static Map<String, Map<String, Object>> extractWireFormatOptions(Map<String, String> options ) { - Map<String, Map<String, Object>> wireFormatOptions = new HashMap<>(); - if (options != null) { - wireFormatOptions.put(OPENWIRE, IntrospectionSupport.extractProperties(options, "wireFormat.default.")); - wireFormatOptions.put(STOMP, IntrospectionSupport.extractProperties(options, "wireFormat.stomp.")); - wireFormatOptions.put(AMQP, IntrospectionSupport.extractProperties(options, "wireFormat.amqp.")); - wireFormatOptions.put(MQTT, IntrospectionSupport.extractProperties(options, "wireFormat.mqtt.")); - wireFormatOptions.put(ALL, IntrospectionSupport.extractProperties(options, "wireFormat.")); - } - return wireFormatOptions; - } - - public static Set<String> parseProtocols(String protocolString) { - Set<String> protocolSet = new HashSet<>();; - if (protocolString != null && !protocolString.isEmpty()) { - protocolSet.addAll(Arrays.asList(protocolString.split(","))); - } - return protocolSet; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java deleted file mode 100644 index 7ac3a97..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java +++ /dev/null @@ -1,122 +0,0 @@ -package org.apache.activemq.broker.transport.auto.nio; - -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.Set; - -import javax.net.ServerSocketFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.nio.AutoInitNioSSLTransport; -import org.apache.activemq.transport.nio.NIOSSLTransport; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; -import org.apache.activemq.transport.tcp.TcpTransportFactory; -import org.apache.activemq.transport.tcp.TcpTransportServer; -import org.apache.activemq.wireformat.WireFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AutoNIOSSLTransportServer extends AutoTcpTransportServer { - - private static final Logger LOG = LoggerFactory.getLogger(AutoNIOSSLTransportServer.class); - - private SSLContext context; - - public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory, - BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException { - super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols); - - this.context = context; - } - - private boolean needClientAuth; - private boolean wantClientAuth; - - protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, - InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException { - NIOSSLTransport transport = new NIOSSLTransport(format, socket, engine, initBuffer, inputBuffer); - if (context != null) { - transport.setSslContext(context); - } - - transport.setNeedClientAuth(needClientAuth); - transport.setWantClientAuth(wantClientAuth); - - - return transport; - } - - @Override - protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { - throw new UnsupportedOperationException("method not supported"); - } - - @Override - public boolean isSslServer() { - return true; - } - - public boolean isNeedClientAuth() { - return this.needClientAuth; - } - - public void setNeedClientAuth(boolean value) { - this.needClientAuth = value; - } - - public boolean isWantClientAuth() { - return this.wantClientAuth; - } - - public void setWantClientAuth(boolean value) { - this.wantClientAuth = value; - } - - - @Override - protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { - - //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format - AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket); - if (context != null) { - in.setSslContext(context); - } - in.start(); - SSLEngine engine = in.getSslSession(); - - //Wait for handshake to finish initializing - byte[] read = null; - do { - in.serviceRead(); - } while((read = in.read) == null); - - in.stop(); - - initBuffer = new InitBuffer(in.readSize, ByteBuffer.allocate(read.length)); - initBuffer.buffer.put(read); - - ProtocolInfo protocolInfo = detectProtocol(read); - - if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { - ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); - } - - WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); - Transport transport = createTransport(socket, format, engine, initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory); - - return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); - } - - -} - - http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java deleted file mode 100644 index e1b6e71..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.auto.nio; - -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.UnknownHostException; - -import javax.net.SocketFactory; - -import org.apache.activemq.transport.nio.NIOTransport; -import org.apache.activemq.wireformat.WireFormat; - -/** - * - * - */ -public class AutoNIOTransport extends NIOTransport { - - public AutoNIOTransport(WireFormat format, Socket socket, - InitBuffer initBuffer) throws IOException { - super(format, socket, initBuffer); - } - - public AutoNIOTransport(WireFormat wireFormat, Socket socket) - throws IOException { - super(wireFormat, socket); - } - - public AutoNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, - URI remoteLocation, URI localLocation) throws UnknownHostException, - IOException { - super(wireFormat, socketFactory, remoteLocation, localLocation); - } - - - boolean doneInitBuffer = false; - - /** - * Read from the initial buffer if it is set - */ - @Override - protected int readFromBuffer() throws IOException { - int readSize = 0; - if (!doneInitBuffer) { - if (initBuffer == null) { - throw new IOException("Null initBuffer"); - } - if (nextFrameSize == -1) { - readSize = 4; - this.initBuffer.buffer.flip(); - for (int i = 0; i < 4; i++) { - currentBuffer.put(initBuffer.buffer.get()); - } - } else { - for (int i = 0; i < 4; i++) { - currentBuffer.put(initBuffer.buffer.get()); - } - readSize = 4; - doneInitBuffer = true; - } - - } else { - readSize += channel.read(currentBuffer); - } - return readSize; - } - - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java deleted file mode 100644 index 8142a2a..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.auto.nio; - -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import javax.net.ServerSocketFactory; -import javax.net.ssl.SSLEngine; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.SslContext; -import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer; -import org.apache.activemq.broker.transport.auto.AutoTransportUtils; -import org.apache.activemq.openwire.OpenWireFormatFactory; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.nio.NIOSSLTransport; -import org.apache.activemq.transport.nio.NIOSSLTransportFactory; -import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; -import org.apache.activemq.transport.tcp.TcpTransportFactory; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.URISupport; -import org.apache.activemq.wireformat.WireFormat; - -/** - * - * - */ -public class AutoNioSslTransportFactory extends NIOSSLTransportFactory implements BrokerServiceAware { - protected BrokerService brokerService; - - /* (non-Javadoc) - * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) - */ - @Override - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - } - - @Override - protected AutoNIOSSLTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - return new AutoNIOSSLTransportServer(context, this, location, serverSocketFactory, brokerService, enabledProtocols) { - - @Override - protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, InitBuffer initBuffer, - ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException { - NIOSSLTransport nioSslTransport = (NIOSSLTransport) detectedFactory.createTransport( - format, socket, engine, initBuffer, inputBuffer); - - if (format.getClass().toString().contains("MQTT")) { - if (!allowLinkStealingSet) { - this.setAllowLinkStealing(true); - } - } - - if (context != null) { - nioSslTransport.setSslContext(context); - } - - nioSslTransport.setNeedClientAuth(isNeedClientAuth()); - nioSslTransport.setWantClientAuth(isWantClientAuth()); - - return nioSslTransport; - } - - }; - - } - - boolean allowLinkStealingSet = false; - private Set<String> enabledProtocols; - - @Override - public TransportServer doBind(final URI location) throws IOException { - try { - if (SslContext.getCurrentSslContext() != null) { - try { - context = SslContext.getCurrentSslContext().getSSLContext(); - } catch (Exception e) { - throw new IOException(e); - } - } - - Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); - - Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); - this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); - - ServerSocketFactory serverSocketFactory = createServerSocketFactory(); - AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); - server.setWireFormatFactory(new OpenWireFormatFactory()); - if (options.get("allowLinkStealing") != null){ - allowLinkStealingSet = true; - } - IntrospectionSupport.setProperties(server, options); - server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, "auto.")); - server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); - server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); - server.bind(); - - return server; - } catch (URISyntaxException e) { - throw IOExceptionSupport.create(e); - } - } - - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java deleted file mode 100644 index 0922ae6..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.auto.nio; - -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import javax.net.ServerSocketFactory; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer; -import org.apache.activemq.broker.transport.auto.AutoTransportUtils; -import org.apache.activemq.openwire.OpenWireFormatFactory; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportServer; -import org.apache.activemq.transport.nio.NIOTransport; -import org.apache.activemq.transport.nio.NIOTransportFactory; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.apache.activemq.transport.tcp.TcpTransportFactory; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.URISupport; -import org.apache.activemq.wireformat.WireFormat; - -/** - * - * - */ -public class AutoNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware { - protected BrokerService brokerService; - /* (non-Javadoc) - * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) - */ - @Override - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - } - - @Override - protected AutoTcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { - return new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { - @Override - protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory detectedTransportFactory) throws IOException { - TcpTransport nioTransport = null; - if (detectedTransportFactory.getClass().equals(NIOTransportFactory.class)) { - nioTransport = new AutoNIOTransport(format, socket,this.initBuffer); - } else { - nioTransport = detectedTransportFactory.createTransport( - format, socket, this.initBuffer); - } - - if (format.getClass().toString().contains("MQTT")) { - if (!allowLinkStealingSet) { - this.setAllowLinkStealing(true); - } - } - - return nioTransport; - } - }; - - } - - boolean allowLinkStealingSet = false; - private Set<String> enabledProtocols; - - @Override - public TransportServer doBind(final URI location) throws IOException { - try { - Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); - - Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); - this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); - - ServerSocketFactory serverSocketFactory = createServerSocketFactory(); - AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); - //server.setWireFormatFactory(createWireFormatFactory(options)); - server.setWireFormatFactory(new OpenWireFormatFactory()); - if (options.get("allowLinkStealing") != null){ - allowLinkStealingSet = true; - } - IntrospectionSupport.setProperties(server, options); - server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); - server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); - server.bind(); - - return server; - } catch (URISyntaxException e) { - throw IOExceptionSupport.create(e); - } - } - - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java deleted file mode 100644 index 11f0574..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.protocol; - - -/** - * - * - */ -public class AmqpProtocolVerifier implements ProtocolVerifier { - - static final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' }; - - @Override - public boolean isProtocol(byte[] value) { - for (int i = 0; i < PREFIX.length; i++) { - if (value[i] != PREFIX[i]) - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java deleted file mode 100644 index 4b4edd2..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.protocol; - -/** - * - * - */ -public class MqttProtocolVerifier implements ProtocolVerifier { - - /* (non-Javadoc) - * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[]) - */ - @Override - public boolean isProtocol(byte[] value) { - boolean mqtt311 = value[4] == 77 && // M - value[5] == 81 && // Q - value[6] == 84 && // T - value[7] == 84; // T - - boolean mqtt31 = value[4] == 77 && // M - value[5] == 81 && // Q - value[6] == 73 && // I - value[7] == 115; // s - - return mqtt311 || mqtt31; - } - - - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java deleted file mode 100644 index e218180..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.protocol; - -import org.apache.activemq.command.WireFormatInfo; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.openwire.OpenWireFormatFactory; - -/** - * - * - */ -public class OpenWireProtocolVerifier implements ProtocolVerifier { - - protected final OpenWireFormatFactory wireFormatFactory; - - public OpenWireProtocolVerifier(OpenWireFormatFactory wireFormatFactory) { - this.wireFormatFactory = wireFormatFactory; - } - - /* (non-Javadoc) - * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[]) - */ - @Override - public boolean isProtocol(byte[] value) { - if (value.length < 8) { - throw new IllegalArgumentException("Protocol header length changed " - + value.length); - } - - int start = !((OpenWireFormat)wireFormatFactory.createWireFormat()).isSizePrefixDisabled() ? 4 : 0; - int j = 0; - // type - if (value[start] != WireFormatInfo.DATA_STRUCTURE_TYPE) { - return false; - } - start++; - WireFormatInfo info = new WireFormatInfo(); - final byte[] magic = info.getMagic(); - int remainingLen = value.length - start; - int useLen = remainingLen > magic.length ? magic.length : remainingLen; - useLen += start; - // magic - for (int i = start; i < useLen; i++) { - if (value[i] != magic[j]) { - return false; - } - j++; - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java deleted file mode 100644 index c5755c7..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.apache.activemq.broker.transport.protocol; - - -public interface ProtocolVerifier { - - public boolean isProtocol(byte[] value); - - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java deleted file mode 100644 index f18f1c7..0000000 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.transport.protocol; - -import java.nio.charset.StandardCharsets; - -/** - * - * - */ -public class StompProtocolVerifier implements ProtocolVerifier { - - /* (non-Javadoc) - * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[]) - */ - @Override - public boolean isProtocol(byte[] value) { - String frameStart = new String(value, StandardCharsets.US_ASCII); - return frameStart.startsWith("CONNECT") || frameStart.startsWith("STOMP"); - } - - - - -} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java new file mode 100644 index 0000000..8787547 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportFactory.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.tcp.SslTransportFactory; +import org.apache.activemq.transport.tcp.SslTransportServer; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AutoSslTransportFactory extends SslTransportFactory implements BrokerServiceAware { + private static final Logger LOG = LoggerFactory.getLogger(AutoSslTransportFactory.class); + + + protected BrokerService brokerService; + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + private Set<String> enabledProtocols; + + /** + * Overriding to use SslTransportServer and allow for proper reflection. + */ + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + + Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + AutoSslTransportServer server = createAutoSslTransportServer(location, (SSLServerSocketFactory)serverSocketFactory); + if (options.get("allowLinkStealing") != null){ + allowLinkStealingSet = true; + } + IntrospectionSupport.setProperties(server, options); + server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, "auto.")); + server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); + server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + boolean allowLinkStealingSet = false; + + /** + * Allows subclasses of SslTransportFactory to create custom instances of + * SslTransportServer. + * + * @param location + * @param serverSocketFactory + * @return + * @throws IOException + * @throws URISyntaxException + */ + // @Override + protected AutoSslTransportServer createAutoSslTransportServer(final URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + AutoSslTransportServer server = new AutoSslTransportServer(this, location, serverSocketFactory, + this.brokerService, enabledProtocols) { + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) + throws IOException { + if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) { + this.setAllowLinkStealing(true); + } + return super.createTransport(socket, format); + } + }; + return server; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java new file mode 100644 index 0000000..1f1a9c6 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoSslTransportServer.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.auto; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Set; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLSocket; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.SslTransport; +import org.apache.activemq.transport.tcp.SslTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.wireformat.WireFormat; + +/** + * An SSL TransportServer. + * + * Allows for client certificate authentication (refer to setNeedClientAuth for + * details). + * NOTE: Client certificate authentication is disabled by default. + * + */ +public class AutoSslTransportServer extends AutoTcpTransportServer { + + + + // Specifies if sockets created from this server should needClientAuth. + private boolean needClientAuth; + + // Specifies if sockets created from this server should wantClientAuth. + private boolean wantClientAuth; + +// /** +// * Creates a ssl transport server for the specified url using the provided +// * serverSocketFactory +// * +// * @param transportFactory The factory used to create transports when connections arrive. +// * @param location The location of the broker to bind to. +// * @param serverSocketFactory The factory used to create this server. +// * @throws IOException passed up from TcpTransportFactory. +// * @throws URISyntaxException passed up from TcpTransportFactory. +// */ +// public SslTransportServer(SslTransportFactory transportFactory, URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { +// super(transportFactory, location, serverSocketFactory); +// } + + public AutoSslTransportServer(SslTransportFactory transportFactory, + URI location, SSLServerSocketFactory serverSocketFactory, + BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols); + // TODO Auto-generated constructor stub + } + + /** + * Sets whether client authentication should be required + * Must be called before {@link #bind()} + * Note: Calling this method clears the wantClientAuth flag + * in the underlying implementation. + */ + public void setNeedClientAuth(boolean needAuth) { + this.needClientAuth = needAuth; + } + + /** + * Returns whether client authentication should be required. + */ + public boolean getNeedClientAuth() { + return this.needClientAuth; + } + + /** + * Returns whether client authentication should be requested. + */ + public boolean getWantClientAuth() { + return this.wantClientAuth; + } + + /** + * Sets whether client authentication should be requested. + * Must be called before {@link #bind()} + * Note: Calling this method clears the needClientAuth flag + * in the underlying implementation. + */ + public void setWantClientAuth(boolean wantAuth) { + this.wantClientAuth = wantAuth; + } + + /** + * Binds this socket to the previously specified URI. + * + * Overridden to allow for proper handling of needClientAuth. + * + * @throws IOException passed up from TcpTransportServer. + */ + @Override + public void bind() throws IOException { + super.bind(); + if (needClientAuth) { + ((SSLServerSocket)this.serverSocket).setNeedClientAuth(true); + } else if (wantClientAuth) { + ((SSLServerSocket)this.serverSocket).setWantClientAuth(true); + } + } + + /** + * Used to create Transports for this server. + * + * Overridden to allow the use of SslTransports (instead of TcpTransports). + * + * @param socket The incoming socket that will be wrapped into the new Transport. + * @param format The WireFormat being used. + * @return The newly return (SSL) Transport. + * @throws IOException + */ + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { + return new SslTransport(format, (SSLSocket)socket, this.initBuffer); + } + + @Override + public boolean isSslServer() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java new file mode 100644 index 0000000..4f7d42a --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportFactory.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.net.ServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * + * + */ +public class AutoTcpTransportFactory extends TcpTransportFactory implements BrokerServiceAware { + + protected BrokerService brokerService; + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + + Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); + //server.setWireFormatFactory(createWireFormatFactory(options)); + server.setWireFormatFactory(new OpenWireFormatFactory()); + if (options.get("allowLinkStealing") != null){ + allowLinkStealingSet = true; + } + IntrospectionSupport.setProperties(server, options); + server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); + server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + boolean allowLinkStealingSet = false; + private Set<String> enabledProtocols; + + @Override + protected AutoTcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + AutoTcpTransportServer server = new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { + + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) + throws IOException { + if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) { + this.setAllowLinkStealing(true); + } + return super.createTransport(socket, format); + } + + }; + + return server; + } +}