Author: rgodfrey Date: Sun Feb 1 21:45:19 2015 New Revision: 1656365 URL: http://svn.apache.org/r1656365 Log: Remove accepting thread and use non blocking io accept
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1656365&r1=1656364&r2=1656365&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java Sun Feb 1 21:45:19 2015 @@ -22,7 +22,6 @@ package org.apache.qpid.transport.networ import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; import java.net.StandardSocketOptions; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -47,18 +46,39 @@ public class NonBlockingNetworkTransport CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - private AcceptingThread _acceptor; private SelectorThread _selector; + + private Set<TransportEncryption> _encryptionSet; + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocketChannel _serverSocket; + private int _timeout; + public void close() { - if(_acceptor != null) - { - _acceptor.close(); - } if(_selector != null) { - _selector.close(); + try + { + if (_serverSocket != null) + { + _selector.cancelAcceptingSocket(_serverSocket); + _serverSocket.close(); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + finally + { + + _selector.close(); + } } } @@ -69,41 +89,7 @@ public class NonBlockingNetworkTransport { try { - _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet); - _acceptor.setDaemon(false); - _acceptor.start(); - _selector = new SelectorThread(config.getAddress().toString()); - _selector.start(); - } - catch (IOException e) - { - throw new TransportException("Failed to start AMQP on port : " + config, e); - } - - - } - - public int getAcceptingPort() - { - return _acceptor == null ? -1 : _acceptor.getPort(); - } - - private class AcceptingThread extends Thread - { - private final Set<TransportEncryption> _encryptionSet; - private volatile boolean _closed = false; - private final NetworkTransportConfiguration _config; - private final ProtocolEngineFactory _factory; - private final SSLContext _sslContext; - private final ServerSocketChannel _serverSocket; - private int _timeout; - - private AcceptingThread(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext, - final Set<TransportEncryption> encryptionSet) throws IOException - { _config = config; _factory = factory; _sslContext = sslContext; @@ -115,158 +101,83 @@ public class NonBlockingNetworkTransport _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); _serverSocket.bind(address); + _serverSocket.configureBlocking(false); _encryptionSet = encryptionSet; - } - - - /** - Close the underlying ServerSocket if it has not already been closed. - */ - public void close() - { - LOGGER.debug("Shutting down the Acceptor"); - _closed = true; - if (!_serverSocket.socket().isClosed()) - { - try - { - _serverSocket.close(); - } - catch (IOException e) - { - throw new TransportException(e); - } - } - } - - private int getPort() - { - return _serverSocket.socket().getLocalPort(); + _selector = new SelectorThread(config.getAddress().toString(), this); + _selector.start(); + _selector.addAcceptingSocket(_serverSocket); } - - @Override - public void run() + catch (IOException e) { - try - { - while (!_closed) - { - SocketChannel socketChannel = null; - try - { - socketChannel = _serverSocket.accept(); - - acceptSocketChannel(socketChannel); - } - catch(RuntimeException e) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socketChannel.socket()); - } - catch(IOException e) - { - if(!_closed) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socketChannel.socket()); - try - { - //Delay to avoid tight spinning the loop during issues such as too many open files - Thread.sleep(1000); - } - catch (InterruptedException ie) - { - LOGGER.debug("Stopping acceptor due to interrupt request"); - _closed = true; - } - } - } - } - } - finally - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " - + _config.getAddress()); - } - } + throw new TransportException("Failed to start AMQP on port : " + config, e); } - public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException - { - final ServerProtocolEngine engine = - (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() - .getRemoteSocketAddress()); - - if(engine != null) - { - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); + } - socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + public int getAcceptingPort() + { + return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); + } + public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException + { + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); + + if(engine != null) + { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NonBlockingConnection connection = + new NonBlockingConnection(socketChannel, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + new Runnable() + { - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - - NonBlockingConnection connection = - new NonBlockingConnection(socketChannel, - engine, - sendBufferSize, - receiveBufferSize, - _timeout, - ticker, - _encryptionSet, - _sslContext, - _config.wantClientAuth(), - _config.needClientAuth(), - new Runnable() + @Override + public void run() { + engine.encryptedTransport(); + } + }, + _selector); - @Override - public void run() - { - engine.encryptedTransport(); - } - }, - _selector); - - engine.setNetworkConnection(connection, connection.getSender()); - connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); - ticker.setConnection(connection); + ticker.setConnection(connection); - connection.start(); + connection.start(); - _selector.addConnection(connection); + _selector.addConnection(connection); - } - else - { - socketChannel.close(); - } } - - private void closeSocketIfNecessary(final Socket socket) + else { - if(socket != null) - { - try - { - socket.close(); - } - catch (IOException e) - { - LOGGER.debug("Exception while closing socket", e); - } - } + socketChannel.close(); } - } + } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java?rev=1656365&r1=1656364&r2=1656365&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java Sun Feb 1 21:45:19 2015 @@ -20,8 +20,11 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -38,16 +41,18 @@ import java.util.concurrent.atomic.Atomi */ public class SelectorThread extends Thread { - + private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); private final Selector _selector; private final AtomicBoolean _closed = new AtomicBoolean(); private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); + private final NonBlockingNetworkTransport _transport; - SelectorThread(final String name) + SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) { super("SelectorThread-"+name); + _transport = nonBlockingNetworkTransport; try { _selector = Selector.open(); @@ -59,6 +64,45 @@ public class SelectorThread extends Thre } } + public void addAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + + try + { + socketChannel.register(_selector, SelectionKey.OP_ACCEPT); + } + catch (ClosedChannelException e) + { + // TODO + e.printStackTrace(); + } + } + }); + _selector.wakeup(); + } + + public void cancelAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + SelectionKey selectionKey = socketChannel.keyFor(_selector); + if(selectionKey != null) + { + selectionKey.cancel(); + } + } + }); + _selector.wakeup(); + } + @Override public void run() { @@ -72,18 +116,33 @@ public class SelectorThread extends Thre _selector.select(nextTimeout); + while(_tasks.peek() != null) + { + Runnable task = _tasks.poll(); + task.run(); + } + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); Set<SelectionKey> selectionKeys = _selector.selectedKeys(); for (SelectionKey key : selectionKeys) { - NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - - key.channel().register(_selector, 0); - - toBeScheduled.add(connection); - _unscheduledConnections.remove(connection); + if(key.isAcceptable()) + { + // todo - should we schedule this rather than running in this thread? + SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); + _transport.acceptSocketChannel(acceptedChannel); + } + else + { + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + + key.channel().register(_selector, 0); + + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); + } } selectionKeys.clear(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org