Repository: activemq Updated Branches: refs/heads/master e5a94bfee -> e14aca871
http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java new file mode 100644 index 0000000..dd2655e --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTcpTransportServer.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.net.ServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.InactivityIOException; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.protocol.AmqpProtocolVerifier; +import org.apache.activemq.transport.protocol.MqttProtocolVerifier; +import org.apache.activemq.transport.protocol.OpenWireProtocolVerifier; +import org.apache.activemq.transport.protocol.ProtocolVerifier; +import org.apache.activemq.transport.protocol.StompProtocolVerifier; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.FactoryFinder; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.wireformat.WireFormatFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A TCP based implementation of {@link TransportServer} + */ +public class AutoTcpTransportServer extends TcpTransportServer { + + private static final Logger LOG = LoggerFactory.getLogger(AutoTcpTransportServer.class); + + protected Map<String, Map<String, Object>> wireFormatOptions; + protected Map<String, Object> autoTransportOptions; + protected Set<String> enabledProtocols; + protected final Map<String, ProtocolVerifier> protocolVerifiers = new ConcurrentHashMap<String, ProtocolVerifier>(); + + protected BrokerService brokerService; + + protected int maxConnectionThreadPoolSize = Integer.MAX_VALUE; + protected int protocolDetectionTimeOut = 30000; + + private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/"); + private final ConcurrentMap<String, TransportFactory> transportFactories = new ConcurrentHashMap<String, TransportFactory>(); + + private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); + + public WireFormatFactory findWireFormatFactory(String scheme, Map<String, Map<String, Object>> options) throws IOException { + WireFormatFactory wff = null; + try { + wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(scheme); + if (options != null) { + IntrospectionSupport.setProperties(wff, options.get(AutoTransportUtils.ALL)); + IntrospectionSupport.setProperties(wff, options.get(scheme)); + } + if (wff instanceof OpenWireFormatFactory) { + protocolVerifiers.put(AutoTransportUtils.OPENWIRE, new OpenWireProtocolVerifier((OpenWireFormatFactory) wff)); + } + return wff; + } catch (Throwable e) { + throw IOExceptionSupport.create("Could not create wire format factory for: " + scheme + ", reason: " + e, e); + } + } + + public TransportFactory findTransportFactory(String scheme, Map<String, ?> options) throws IOException { + scheme = append(scheme, "nio"); + scheme = append(scheme, "ssl"); + + if (scheme.isEmpty()) { + scheme = "tcp"; + } + + TransportFactory tf = transportFactories.get(scheme); + if (tf == null) { + // Try to load if from a META-INF property. + try { + tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); + if (options != null) { + IntrospectionSupport.setProperties(tf, options); + } + transportFactories.put(scheme, tf); + } catch (Throwable e) { + throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); + } + } + return tf; + } + + protected String append(String currentScheme, String scheme) { + if (this.getBindLocation().getScheme().contains(scheme)) { + if (!currentScheme.isEmpty()) { + currentScheme += "+"; + } + currentScheme += scheme; + } + return currentScheme; + } + + /** + * @param transportFactory + * @param location + * @param serverSocketFactory + * @throws IOException + * @throws URISyntaxException + */ + public AutoTcpTransportServer(TcpTransportFactory transportFactory, + URI location, ServerSocketFactory serverSocketFactory, BrokerService brokerService, + Set<String> enabledProtocols) + throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory); + + //Use an executor service here to handle new connections. Setting the max number + //of threads to the maximum number of connections the thread count isn't unbounded + service = new ThreadPoolExecutor(maxConnectionThreadPoolSize, + maxConnectionThreadPoolSize, + 30L, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + //allow the thread pool to shrink if the max number of threads isn't needed + service.allowCoreThreadTimeOut(true); + + this.brokerService = brokerService; + this.enabledProtocols = enabledProtocols; + initProtocolVerifiers(); + } + + public int getMaxConnectionThreadPoolSize() { + return maxConnectionThreadPoolSize; + } + + public void setMaxConnectionThreadPoolSize(int maxConnectionThreadPoolSize) { + this.maxConnectionThreadPoolSize = maxConnectionThreadPoolSize; + service.setCorePoolSize(maxConnectionThreadPoolSize); + service.setMaximumPoolSize(maxConnectionThreadPoolSize); + } + + public void setProtocolDetectionTimeOut(int protocolDetectionTimeOut) { + this.protocolDetectionTimeOut = protocolDetectionTimeOut; + } + + @Override + public void setWireFormatFactory(WireFormatFactory factory) { + super.setWireFormatFactory(factory); + initOpenWireProtocolVerifier(); + } + + protected void initProtocolVerifiers() { + initOpenWireProtocolVerifier(); + + if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.AMQP)) { + protocolVerifiers.put(AutoTransportUtils.AMQP, new AmqpProtocolVerifier()); + } + if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.STOMP)) { + protocolVerifiers.put(AutoTransportUtils.STOMP, new StompProtocolVerifier()); + } + if (isAllProtocols()|| enabledProtocols.contains(AutoTransportUtils.MQTT)) { + protocolVerifiers.put(AutoTransportUtils.MQTT, new MqttProtocolVerifier()); + } + } + + protected void initOpenWireProtocolVerifier() { + if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.OPENWIRE)) { + OpenWireProtocolVerifier owpv; + if (wireFormatFactory instanceof OpenWireFormatFactory) { + owpv = new OpenWireProtocolVerifier((OpenWireFormatFactory) wireFormatFactory); + } else { + owpv = new OpenWireProtocolVerifier(new OpenWireFormatFactory()); + } + protocolVerifiers.put(AutoTransportUtils.OPENWIRE, owpv); + } + } + + protected boolean isAllProtocols() { + return enabledProtocols == null || enabledProtocols.isEmpty(); + } + + + protected final ThreadPoolExecutor service; + + + /** + * This holds the initial buffer that has been read to detect the protocol. + */ + public InitBuffer initBuffer; + + @Override + protected void handleSocket(final Socket socket) { + final AutoTcpTransportServer server = this; + //This needs to be done in a new thread because + //the socket might be waiting on the client to send bytes + //doHandleSocket can't complete until the protocol can be detected + service.submit(new Runnable() { + @Override + public void run() { + server.doHandleSocket(socket); + } + }); + } + + @Override + protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { + final InputStream is = socket.getInputStream(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + final AtomicInteger readBytes = new AtomicInteger(0); + final ByteBuffer data = ByteBuffer.allocate(8); + // We need to peak at the first 8 bytes of the buffer to detect the protocol + Future<?> future = executor.submit(new Runnable() { + @Override + public void run() { + try { + do { + int read = is.read(); + if (read == -1) { + throw new IOException("Connection faild, stream is closed."); + } + data.put((byte) read); + readBytes.incrementAndGet(); + } while (readBytes.get() < 8); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + }); + + try { + //Wait for protocolDetectionTimeOut if defined + if (protocolDetectionTimeOut > 0) { + future.get(protocolDetectionTimeOut, TimeUnit.MILLISECONDS); + } else { + future.get(); + } + data.flip(); + } catch (TimeoutException e) { + throw new InactivityIOException("Client timed out before wire format could be detected. " + + " 8 bytes are required to detect the protocol but only: " + readBytes + " were sent."); + } + + ProtocolInfo protocolInfo = detectProtocol(data.array()); + + initBuffer = new InitBuffer(readBytes.get(), ByteBuffer.allocate(readBytes.get())); + initBuffer.buffer.put(data.array()); + + if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { + ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); + } + + WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); + Transport transport = createTransport(socket, format,protocolInfo.detectedTransportFactory); + + return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); + } + + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { + return new TcpTransport(format, socket, this.initBuffer); + } + + /** + * @param socket + * @param format + * @param detectedTransportFactory + * @return + */ + protected TcpTransport createTransport(Socket socket, WireFormat format, + TcpTransportFactory detectedTransportFactory) throws IOException { + return createTransport(socket, format); + } + + public void setWireFormatOptions(Map<String, Map<String, Object>> wireFormatOptions) { + this.wireFormatOptions = wireFormatOptions; + } + + public void setEnabledProtocols(Set<String> enabledProtocols) { + this.enabledProtocols = enabledProtocols; + } + + public void setAutoTransportOptions(Map<String, Object> autoTransportOptions) { + this.autoTransportOptions = autoTransportOptions; + if (autoTransportOptions.get("protocols") != null) { + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoTransportOptions.get("protocols")); + } + } + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + if (service != null) { + service.shutdown(); + } + super.doStop(stopper); + } + + protected ProtocolInfo detectProtocol(byte[] buffer) throws IOException { + TcpTransportFactory detectedTransportFactory = transportFactory; + WireFormatFactory detectedWireFormatFactory = wireFormatFactory; + + boolean found = false; + for (String scheme : protocolVerifiers.keySet()) { + if (protocolVerifiers.get(scheme).isProtocol(buffer)) { + LOG.debug("Detected " + scheme); + detectedWireFormatFactory = findWireFormatFactory(scheme, wireFormatOptions); + + if (scheme.equals("default")) { + scheme = ""; + } + + detectedTransportFactory = (TcpTransportFactory) findTransportFactory(scheme, transportOptions); + found = true; + break; + } + } + + if (!found) { + throw new IllegalStateException("Could not detect wire format"); + } + + return new ProtocolInfo(detectedTransportFactory, detectedWireFormatFactory); + + } + + protected class ProtocolInfo { + public final TcpTransportFactory detectedTransportFactory; + public final WireFormatFactory detectedWireFormatFactory; + + public ProtocolInfo(TcpTransportFactory detectedTransportFactory, + WireFormatFactory detectedWireFormatFactory) { + super(); + this.detectedTransportFactory = detectedTransportFactory; + this.detectedWireFormatFactory = detectedWireFormatFactory; + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTransportUtils.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTransportUtils.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTransportUtils.java new file mode 100644 index 0000000..14823db --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/AutoTransportUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.util.IntrospectionSupport; + +/** + * + * + */ +public class AutoTransportUtils { + + //wireformats + public static String ALL = "all"; + public static String OPENWIRE = "default"; + public static String STOMP = "stomp"; + public static String AMQP = "amqp"; + public static String MQTT = "mqtt"; + + //transports + public static String AUTO = "auto"; + + public static Map<String, Map<String, Object>> extractWireFormatOptions(Map<String, String> options ) { + Map<String, Map<String, Object>> wireFormatOptions = new HashMap<>(); + if (options != null) { + wireFormatOptions.put(OPENWIRE, IntrospectionSupport.extractProperties(options, "wireFormat.default.")); + wireFormatOptions.put(STOMP, IntrospectionSupport.extractProperties(options, "wireFormat.stomp.")); + wireFormatOptions.put(AMQP, IntrospectionSupport.extractProperties(options, "wireFormat.amqp.")); + wireFormatOptions.put(MQTT, IntrospectionSupport.extractProperties(options, "wireFormat.mqtt.")); + wireFormatOptions.put(ALL, IntrospectionSupport.extractProperties(options, "wireFormat.")); + } + return wireFormatOptions; + } + + public static Set<String> parseProtocols(String protocolString) { + Set<String> protocolSet = new HashSet<>();; + if (protocolString != null && !protocolString.isEmpty()) { + protocolSet.addAll(Arrays.asList(protocolString.split(","))); + } + return protocolSet; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java new file mode 100644 index 0000000..a04bc6e --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java @@ -0,0 +1,159 @@ +package org.apache.activemq.transport.auto.nio; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.InactivityIOException; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.auto.AutoTcpTransportServer; +import org.apache.activemq.transport.nio.AutoInitNioSSLTransport; +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.wireformat.WireFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class AutoNIOSSLTransportServer extends AutoTcpTransportServer { + + private static final Logger LOG = LoggerFactory.getLogger(AutoNIOSSLTransportServer.class); + + private SSLContext context; + + public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory, + BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols); + + this.context = context; + } + + private boolean needClientAuth; + private boolean wantClientAuth; + + protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, + InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException { + NIOSSLTransport transport = new NIOSSLTransport(format, socket, engine, initBuffer, inputBuffer); + if (context != null) { + transport.setSslContext(context); + } + + transport.setNeedClientAuth(needClientAuth); + transport.setWantClientAuth(wantClientAuth); + + + return transport; + } + + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { + throw new UnsupportedOperationException("method not supported"); + } + + @Override + public boolean isSslServer() { + return true; + } + + public boolean isNeedClientAuth() { + return this.needClientAuth; + } + + public void setNeedClientAuth(boolean value) { + this.needClientAuth = value; + } + + public boolean isWantClientAuth() { + return this.wantClientAuth; + } + + public void setWantClientAuth(boolean value) { + this.wantClientAuth = value; + } + + + @Override + protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + + //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format + final AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket); + if (context != null) { + in.setSslContext(context); + } + in.start(); + SSLEngine engine = in.getSslSession(); + + Future<Integer> future = executor.submit(new Callable<Integer>() { + @Override + public Integer call() throws Exception { + //Wait for handshake to finish initializing + do { + in.serviceRead(); + } while(in.readSize < 8); + + return in.readSize; + } + }); + + try { + future.get(protocolDetectionTimeOut, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new InactivityIOException("Client timed out before wire format could be detected. " + + " 8 bytes are required to detect the protocol but only: " + in.readSize + " were sent."); + } + + in.stop(); + + initBuffer = new InitBuffer(in.readSize, ByteBuffer.allocate(in.read.length)); + initBuffer.buffer.put(in.read); + + ProtocolInfo protocolInfo = detectProtocol(in.read); + + if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { + ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); + } + + WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); + Transport transport = createTransport(socket, format, engine, initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory); + + return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); + } + + +} + + http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOTransport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOTransport.java new file mode 100644 index 0000000..a9a9f5d --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOTransport.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto.nio; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; + +import javax.net.SocketFactory; + +import org.apache.activemq.transport.nio.NIOTransport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * + * + */ +public class AutoNIOTransport extends NIOTransport { + + public AutoNIOTransport(WireFormat format, Socket socket, + InitBuffer initBuffer) throws IOException { + super(format, socket, initBuffer); + } + + public AutoNIOTransport(WireFormat wireFormat, Socket socket) + throws IOException { + super(wireFormat, socket); + } + + public AutoNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, + URI remoteLocation, URI localLocation) throws UnknownHostException, + IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + + boolean doneInitBuffer = false; + + /** + * Read from the initial buffer if it is set + */ + @Override + protected int readFromBuffer() throws IOException { + int readSize = 0; + if (!doneInitBuffer) { + if (initBuffer == null || initBuffer.readSize < 8) { + throw new IOException("Protocol type could not be determined."); + } + if (nextFrameSize == -1) { + readSize = 4; + this.initBuffer.buffer.flip(); + for (int i = 0; i < 4; i++) { + currentBuffer.put(initBuffer.buffer.get()); + } + } else { + for (int i = 0; i < 4; i++) { + currentBuffer.put(initBuffer.buffer.get()); + } + readSize = 4; + doneInitBuffer = true; + } + + } else { + readSize += channel.read(currentBuffer); + } + return readSize; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioSslTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioSslTransportFactory.java new file mode 100644 index 0000000..8a29ab2 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioSslTransportFactory.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto.nio; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLEngine; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.broker.SslContext; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.auto.AutoTcpTransportServer; +import org.apache.activemq.transport.auto.AutoTransportUtils; +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.transport.nio.NIOSSLTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * + * + */ +public class AutoNioSslTransportFactory extends NIOSSLTransportFactory implements BrokerServiceAware { + protected BrokerService brokerService; + + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + @Override + protected AutoNIOSSLTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new AutoNIOSSLTransportServer(context, this, location, serverSocketFactory, brokerService, enabledProtocols) { + + @Override + protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, InitBuffer initBuffer, + ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException { + NIOSSLTransport nioSslTransport = (NIOSSLTransport) detectedFactory.createTransport( + format, socket, engine, initBuffer, inputBuffer); + + if (format.getClass().toString().contains("MQTT")) { + if (!allowLinkStealingSet) { + this.setAllowLinkStealing(true); + } + } + + if (context != null) { + nioSslTransport.setSslContext(context); + } + + nioSslTransport.setNeedClientAuth(isNeedClientAuth()); + nioSslTransport.setWantClientAuth(isWantClientAuth()); + + return nioSslTransport; + } + + }; + + } + + boolean allowLinkStealingSet = false; + private Set<String> enabledProtocols; + + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + if (SslContext.getCurrentSslContext() != null) { + try { + context = SslContext.getCurrentSslContext().getSSLContext(); + } catch (Exception e) { + throw new IOException(e); + } + } + + Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + + Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); + server.setWireFormatFactory(new OpenWireFormatFactory()); + if (options.get("allowLinkStealing") != null){ + allowLinkStealingSet = true; + } + IntrospectionSupport.setProperties(server, options); + server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, "auto.")); + server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); + server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java new file mode 100644 index 0000000..52244ff --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNioTransportFactory.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto.nio; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.net.ServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.auto.AutoTcpTransportServer; +import org.apache.activemq.transport.auto.AutoTransportUtils; +import org.apache.activemq.transport.nio.NIOTransport; +import org.apache.activemq.transport.nio.NIOTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * + * + */ +public class AutoNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware { + protected BrokerService brokerService; + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + @Override + protected AutoTcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory detectedTransportFactory) throws IOException { + TcpTransport nioTransport = null; + if (detectedTransportFactory.getClass().equals(NIOTransportFactory.class)) { + nioTransport = new AutoNIOTransport(format, socket,this.initBuffer); + } else { + nioTransport = detectedTransportFactory.createTransport( + format, socket, this.initBuffer); + } + + if (format.getClass().toString().contains("MQTT")) { + if (!allowLinkStealingSet) { + this.setAllowLinkStealing(true); + } + } + + return nioTransport; + } + }; + + } + + boolean allowLinkStealingSet = false; + private Set<String> enabledProtocols; + + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + + Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); + //server.setWireFormatFactory(createWireFormatFactory(options)); + server.setWireFormatFactory(new OpenWireFormatFactory()); + if (options.get("allowLinkStealing") != null){ + allowLinkStealingSet = true; + } + IntrospectionSupport.setProperties(server, options); + server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); + server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java index f922e98..000ec41 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java @@ -172,17 +172,11 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport { while (true) { if (!plain.hasRemaining()) { - int readCount = secureRead(plain); - if (readCount == 0) { - break; - } - // channel is closed, cleanup if (readCount == -1) { onException(new EOFException()); - selection.close(); break; } @@ -191,8 +185,11 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport { if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { processCommand(plain); - //Break when command is found - break; + //we have received enough bytes to detect the protocol + if (receiveCounter >= 8) { + readSize = receiveCounter; + break; + } } } } catch (IOException e) { @@ -204,8 +201,13 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport { @Override protected void processCommand(ByteBuffer plain) throws Exception { - read = plain.array(); - readSize = receiveCounter; + ByteBuffer newBuffer = ByteBuffer.allocate(receiveCounter); + if (read != null) { + newBuffer.put(read); + } + newBuffer.put(plain); + newBuffer.flip(); + read = newBuffer.array(); } @@ -214,7 +216,6 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport { taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task"); // no need to init as we can delay that until demand (eg in doHandshake) connect(); - //super.doStart(); } @@ -224,10 +225,6 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport { taskRunnerFactory.shutdownNow(); taskRunnerFactory = null; } -// if (selection != null) { -// selection.close(); -// selection = null; -// } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/AmqpProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/AmqpProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/AmqpProtocolVerifier.java new file mode 100644 index 0000000..fa6d6c6 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/AmqpProtocolVerifier.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.protocol; + + +/** + * + * + */ +public class AmqpProtocolVerifier implements ProtocolVerifier { + + static final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' }; + + @Override + public boolean isProtocol(byte[] value) { + for (int i = 0; i < PREFIX.length; i++) { + if (value[i] != PREFIX[i]) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java new file mode 100644 index 0000000..e989f7e --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.protocol; + +/** + * + * + */ +public class MqttProtocolVerifier implements ProtocolVerifier { + + /* (non-Javadoc) + * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[]) + */ + @Override + public boolean isProtocol(byte[] value) { + boolean mqtt311 = value[4] == 77 && // M + value[5] == 81 && // Q + value[6] == 84 && // T + value[7] == 84; // T + + boolean mqtt31 = value[4] == 77 && // M + value[5] == 81 && // Q + value[6] == 73 && // I + value[7] == 115; // s + + return mqtt311 || mqtt31; + } + + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/OpenWireProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/OpenWireProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/OpenWireProtocolVerifier.java new file mode 100644 index 0000000..71277c2 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/OpenWireProtocolVerifier.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.protocol; + +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.openwire.OpenWireFormatFactory; + +/** + * + * + */ +public class OpenWireProtocolVerifier implements ProtocolVerifier { + + protected final OpenWireFormatFactory wireFormatFactory; + + public OpenWireProtocolVerifier(OpenWireFormatFactory wireFormatFactory) { + this.wireFormatFactory = wireFormatFactory; + } + + /* (non-Javadoc) + * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[]) + */ + @Override + public boolean isProtocol(byte[] value) { + if (value.length < 8) { + throw new IllegalArgumentException("Protocol header length changed " + + value.length); + } + + int start = !((OpenWireFormat)wireFormatFactory.createWireFormat()).isSizePrefixDisabled() ? 4 : 0; + int j = 0; + // type + if (value[start] != WireFormatInfo.DATA_STRUCTURE_TYPE) { + return false; + } + start++; + WireFormatInfo info = new WireFormatInfo(); + final byte[] magic = info.getMagic(); + int remainingLen = value.length - start; + int useLen = remainingLen > magic.length ? magic.length : remainingLen; + useLen += start; + // magic + for (int i = start; i < useLen; i++) { + if (value[i] != magic[j]) { + return false; + } + j++; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/ProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/ProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/ProtocolVerifier.java new file mode 100644 index 0000000..8bb2399 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/ProtocolVerifier.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.protocol; + + +public interface ProtocolVerifier { + + public boolean isProtocol(byte[] value); + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/StompProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/StompProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/StompProtocolVerifier.java new file mode 100644 index 0000000..5e7275d --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/StompProtocolVerifier.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.protocol; + +import java.nio.charset.StandardCharsets; + +/** + * + * + */ +public class StompProtocolVerifier implements ProtocolVerifier { + + /* (non-Javadoc) + * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[]) + */ + @Override + public boolean isProtocol(byte[] value) { + String frameStart = new String(value, StandardCharsets.US_ASCII); + return frameStart.startsWith("CONNECT") || frameStart.startsWith("STOMP"); + } + + + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto index d6791eb..3af6c99 100644 --- a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto +++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto @@ -14,4 +14,4 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -class=org.apache.activemq.broker.transport.auto.AutoTcpTransportFactory \ No newline at end of file +class=org.apache.activemq.transport.auto.AutoTcpTransportFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio index a03cc27..d5d7802 100644 --- a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio +++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio @@ -14,4 +14,4 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -class=org.apache.activemq.broker.transport.auto.nio.AutoNioTransportFactory \ No newline at end of file +class=org.apache.activemq.transport.auto.nio.AutoNioTransportFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl index 74a08c1..29972e3 100644 --- a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl +++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl @@ -14,4 +14,4 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -class=org.apache.activemq.broker.transport.auto.nio.AutoNioSslTransportFactory \ No newline at end of file +class=org.apache.activemq.transport.auto.nio.AutoNioSslTransportFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl index d6e626e..23d1099 100644 --- a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl +++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl @@ -14,4 +14,4 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -class=org.apache.activemq.broker.transport.auto.AutoSslTransportFactory \ No newline at end of file +class=org.apache.activemq.transport.auto.AutoSslTransportFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java index eeb68d8..97148ac 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java @@ -73,8 +73,9 @@ public class NIOSSLTransport extends NIOTransport { ByteBuffer inputBuffer) throws IOException { super(wireFormat, socket, initBuffer); this.sslEngine = engine; - if (engine != null) + if (engine != null) { this.sslSession = engine.getSession(); + } this.inputBuffer = inputBuffer; } @@ -146,11 +147,13 @@ public class NIOSSLTransport extends NIOTransport { this.buffOut = outputStream; //If the sslEngine was not passed in, then handshake - if (!hasSslEngine) + if (!hasSslEngine) { sslEngine.beginHandshake(); + } handshakeStatus = sslEngine.getHandshakeStatus(); - if (!hasSslEngine) + if (!hasSslEngine) { doHandshake(); + } // if (hasSslEngine) { selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { @@ -328,27 +331,28 @@ public class NIOSSLTransport extends NIOTransport { currentBuffer.putInt(nextFrameSize); } else { - // If its all in one read then we can just take it all, otherwise take only // the current frame size and the next iteration starts a new command. - if (currentBuffer.remaining() >= plain.remaining()) { - currentBuffer.put(plain); - } else { - byte[] fill = new byte[currentBuffer.remaining()]; - plain.get(fill); - currentBuffer.put(fill); - } + if (currentBuffer != null) { + if (currentBuffer.remaining() >= plain.remaining()) { + currentBuffer.put(plain); + } else { + byte[] fill = new byte[currentBuffer.remaining()]; + plain.get(fill); + currentBuffer.put(fill); + } - // Either we have enough data for a new command or we have to wait for some more. - if (currentBuffer.hasRemaining()) { - return; - } else { - currentBuffer.flip(); - Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); - doConsume(command); - nextFrameSize = -1; - currentBuffer = null; - } + // Either we have enough data for a new command or we have to wait for some more. + if (currentBuffer.hasRemaining()) { + return; + } else { + currentBuffer.flip(); + Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); + doConsume(command); + nextFrameSize = -1; + currentBuffer = null; + } + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index c7fe00f..5896e74 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -481,47 +481,60 @@ public class TcpTransportServer extends TransportServerThreadSupport implements final protected void doHandleSocket(Socket socket) { boolean closeSocket = true; + boolean countIncremented = false; try { - if (this.currentTransportCount.get() >= this.maximumConnections) { - throw new ExceededMaximumConnectionsException( - "Exceeded the maximum number of allowed client connections. See the '" + - "maximumConnections' property on the TCP transport configuration URI " + - "in the ActiveMQ configuration file (e.g., activemq.xml)"); - } else { - currentTransportCount.incrementAndGet(); - - HashMap<String, Object> options = new HashMap<String, Object>(); - options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); - options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); - options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); - options.put("trace", Boolean.valueOf(trace)); - options.put("soTimeout", Integer.valueOf(soTimeout)); - options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); - options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); - options.put("logWriterName", logWriterName); - options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); - options.put("startLogging", Boolean.valueOf(startLogging)); - options.putAll(transportOptions); - - TransportInfo transportInfo = configureTransport(this, socket); - closeSocket = false; - - if (transportInfo.transport instanceof ServiceSupport) { - ((ServiceSupport) transportInfo.transport).addServiceListener(this); - } + int currentCount; + do { + currentCount = currentTransportCount.get(); + if (currentCount >= this.maximumConnections) { + throw new ExceededMaximumConnectionsException( + "Exceeded the maximum number of allowed client connections. See the '" + + "maximumConnections' property on the TCP transport configuration URI " + + "in the ActiveMQ configuration file (e.g., activemq.xml)"); + } + + //Increment this value before configuring the transport + //This is necessary because some of the transport servers must read from the + //socket during configureTransport() so we want to make sure this value is + //accurate as the transport server could pause here waiting for data to be sent from a client + } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1)); + countIncremented = true; + + HashMap<String, Object> options = new HashMap<String, Object>(); + options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); + options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); + options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); + options.put("trace", Boolean.valueOf(trace)); + options.put("soTimeout", Integer.valueOf(soTimeout)); + options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); + options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); + options.put("logWriterName", logWriterName); + options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); + options.put("startLogging", Boolean.valueOf(startLogging)); + options.putAll(transportOptions); + + TransportInfo transportInfo = configureTransport(this, socket); + closeSocket = false; + + if (transportInfo.transport instanceof ServiceSupport) { + ((ServiceSupport) transportInfo.transport).addServiceListener(this); + } - Transport configuredTransport = transportInfo.transportFactory.serverConfigure( - transportInfo.transport, transportInfo.format, options); + Transport configuredTransport = transportInfo.transportFactory.serverConfigure( + transportInfo.transport, transportInfo.format, options); + + getAcceptListener().onAccept(configuredTransport); - getAcceptListener().onAccept(configuredTransport); - } } catch (SocketTimeoutException ste) { // expect this to happen - currentTransportCount.decrementAndGet(); } catch (Exception e) { - currentTransportCount.decrementAndGet(); if (closeSocket) { try { + //if closing the socket, only decrement the count it was actually incremented + //where it was incremented + if (countIncremented) { + currentTransportCount.decrementAndGet(); + } socket.close(); } catch (Exception ignore) { } http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/auto/AutoStompConnectTimeoutTest.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/auto/AutoStompConnectTimeoutTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/auto/AutoStompConnectTimeoutTest.java new file mode 100644 index 0000000..e7b372b --- /dev/null +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/auto/AutoStompConnectTimeoutTest.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp.auto; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.Socket; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLSocketFactory; + +import org.apache.activemq.transport.auto.AutoTcpTransportServer; +import org.apache.activemq.transport.stomp.StompTestSupport; +import org.apache.activemq.util.Wait; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that connection attempts that don't send the connect get cleaned by + * by the protocolDetectionTimeOut property + */ +@RunWith(Parameterized.class) +public class AutoStompConnectTimeoutTest extends StompTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(AutoStompConnectTimeoutTest.class); + + private Socket connection; + protected String connectorScheme; + + @Parameters(name="{0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {"auto"}, + {"auto+ssl"}, + {"auto+nio"}, + {"auto+nio+ssl"} + }); + } + + public AutoStompConnectTimeoutTest(String connectorScheme) { + this.connectorScheme = connectorScheme; + } + + protected String getConnectorScheme() { + return connectorScheme; + } + + @Override + public void tearDown() throws Exception { + if (connection != null) { + try { + connection.close(); + } catch (Throwable e) {} + connection = null; + } + super.tearDown(); + } + + @Override + public String getAdditionalConfig() { + return "?protocolDetectionTimeOut=1500"; + } + + @Test(timeout = 15000) + public void testInactivityMonitor() throws Exception { + + Thread t1 = new Thread() { + + @Override + public void run() { + try { + connection = createSocket(); + connection.getOutputStream().write('C'); + connection.getOutputStream().flush(); + } catch (Exception ex) { + LOG.error("unexpected exception on connect/disconnect", ex); + exceptions.add(ex); + } + } + }; + + t1.start(); + + assertTrue("one connection", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + AutoTcpTransportServer server = (AutoTcpTransportServer) brokerService.getTransportConnectorByScheme(getConnectorScheme()).getServer(); + return 1 == server.getCurrentTransportCount().get(); + } + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250))); + + // and it should be closed due to inactivity + assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + AutoTcpTransportServer server = (AutoTcpTransportServer) brokerService.getTransportConnectorByScheme(getConnectorScheme()).getServer(); + return 0 == server.getCurrentTransportCount().get(); + } + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500))); + + assertTrue("no exceptions", exceptions.isEmpty()); + } + + @Override + protected boolean isUseTcpConnector() { + return false; + } + @Override + protected boolean isUseAutoConnector() { + return connectorScheme.equalsIgnoreCase("auto"); + } + + @Override + protected boolean isUseAutoSslConnector() { + return connectorScheme.equalsIgnoreCase("auto+ssl"); + } + + @Override + protected boolean isUseAutoNioConnector() { + return connectorScheme.equalsIgnoreCase("auto+nio"); + } + + @Override + protected boolean isUseAutoNioPlusSslConnector() { + return connectorScheme.equalsIgnoreCase("auto+nio+ssl"); + } + + @Override + protected Socket createSocket() throws IOException { + + boolean useSSL = false; + int port = 0; + + switch (connectorScheme) { + case "auto": + port = this.autoPort; + break; + case "auto+ssl": + useSSL = true; + port = this.autoSslPort; + break; + case "auto+nio": + port = this.autoNioPort; + break; + case "auto+nio+ssl": + useSSL = true; + port = this.autoNioSslPort; + break; + default: + throw new IOException("Invalid STOMP connector scheme passed to test."); + } + + if (useSSL) { + return SSLSocketFactory.getDefault().createSocket("localhost", port); + } else { + return new Socket("localhost", port); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java index 496ccf7..cb90b41 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoNIOSslTransportBrokerTest.java @@ -49,7 +49,6 @@ public class AutoNIOSslTransportBrokerTest extends TransportBrokerTestSupport { System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); - //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager"); maxWait = 10000; super.setUp(); http://git-wip-us.apache.org/repos/asf/activemq/blob/e14aca87/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java new file mode 100644 index 0000000..bbc20a3 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportMaxConnectionsTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.auto; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.springframework.jms.support.JmsUtils; + +@RunWith(Parameterized.class) +public class AutoTransportMaxConnectionsTest { + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + private static final int maxConnections = 20; + + private final ExecutorService executor = Executors.newCachedThreadPool(); + private String connectionUri; + private BrokerService service; + private TransportConnector connector; + private final String transportType; + + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {"auto"}, + {"auto+nio"}, + {"auto+ssl"}, + {"auto+nio+ssl"}, + }); + } + + + public AutoTransportMaxConnectionsTest(String transportType) { + super(); + this.transportType = transportType; + } + + + @Before + public void setUp() throws Exception { + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + + service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + connector = service.addConnector(transportType + "://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections); + connectionUri = connector.getPublishableConnectString(); + service.start(); + service.waitUntilStarted(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + @Test + public void testMaxConnectionControl() throws Exception { + final ConnectionFactory cf = createConnectionFactory(); + final CountDownLatch startupLatch = new CountDownLatch(1); + + for(int i = 0; i < maxConnections + 20; i++) { + executor.submit(new Runnable() { + @Override + public void run() { + Connection conn = null; + try { + startupLatch.await(); + conn = cf.createConnection(); + conn.start(); + } catch (Exception e) { + //JmsUtils.closeConnection(conn); + } + } + }); + } + + TcpTransportServer transportServer = (TcpTransportServer)connector.getServer(); + // ensure the max connections is in effect + assertEquals(maxConnections, transportServer.getMaximumConnections()); + // No connections at first + assertEquals(0, connector.getConnections().size()); + // Release the latch to set up connections in parallel + startupLatch.countDown(); + + final TransportConnector connector = this.connector; + + // Expect the max connections is created + assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(), + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connector.getConnections().size() == maxConnections; + } + }) + ); + + } + + @After + public void tearDown() throws Exception { + executor.shutdown(); + + service.stop(); + service.waitUntilStopped(); + } +}