Author: rgodfrey
Date: Sun Feb  1 21:45:19 2015
New Revision: 1656365

URL: http://svn.apache.org/r1656365
Log:
Remove accepting thread and use non blocking io accept

Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?rev=1656365&r1=1656364&r2=1656365&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
 Sun Feb  1 21:45:19 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.transport.networ
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.net.StandardSocketOptions;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
@@ -47,18 +46,39 @@ public class NonBlockingNetworkTransport
                                                           
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
     private static final int HANDSHAKE_TIMEOUT = 
Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
                                                                    
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
-    private AcceptingThread _acceptor;
     private SelectorThread _selector;
 
+
+    private  Set<TransportEncryption> _encryptionSet;
+    private volatile boolean _closed = false;
+    private NetworkTransportConfiguration _config;
+    private ProtocolEngineFactory _factory;
+    private SSLContext _sslContext;
+    private ServerSocketChannel _serverSocket;
+    private int _timeout;
+
     public void close()
     {
-        if(_acceptor != null)
-        {
-            _acceptor.close();
-        }
         if(_selector != null)
         {
-            _selector.close();
+            try
+            {
+                if (_serverSocket != null)
+                {
+                    _selector.cancelAcceptingSocket(_serverSocket);
+                    _serverSocket.close();
+                }
+            }
+            catch (IOException e)
+            {
+                // TODO
+                e.printStackTrace();
+            }
+            finally
+            {
+
+                _selector.close();
+            }
         }
     }
 
@@ -69,41 +89,7 @@ public class NonBlockingNetworkTransport
     {
         try
         {
-            _acceptor = new AcceptingThread(config, factory, sslContext, 
encryptionSet);
-            _acceptor.setDaemon(false);
-            _acceptor.start();
 
-            _selector = new SelectorThread(config.getAddress().toString());
-            _selector.start();
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Failed to start AMQP on port : " + 
config, e);
-        }
-
-
-    }
-
-    public int getAcceptingPort()
-    {
-        return _acceptor == null ? -1 : _acceptor.getPort();
-    }
-
-    private class AcceptingThread extends Thread
-    {
-        private final Set<TransportEncryption> _encryptionSet;
-        private volatile boolean _closed = false;
-        private final NetworkTransportConfiguration _config;
-        private final ProtocolEngineFactory _factory;
-        private final SSLContext _sslContext;
-        private final ServerSocketChannel _serverSocket;
-        private int _timeout;
-
-        private AcceptingThread(NetworkTransportConfiguration config,
-                                ProtocolEngineFactory factory,
-                                SSLContext sslContext,
-                                final Set<TransportEncryption> encryptionSet) 
throws IOException
-        {
             _config = config;
             _factory = factory;
             _sslContext = sslContext;
@@ -115,158 +101,83 @@ public class NonBlockingNetworkTransport
 
             _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
             _serverSocket.bind(address);
+            _serverSocket.configureBlocking(false);
             _encryptionSet = encryptionSet;
-        }
-
-
-        /**
-         Close the underlying ServerSocket if it has not already been closed.
-         */
-        public void close()
-        {
-            LOGGER.debug("Shutting down the Acceptor");
-            _closed = true;
 
-            if (!_serverSocket.socket().isClosed())
-            {
-                try
-                {
-                    _serverSocket.close();
-                }
-                catch (IOException e)
-                {
-                    throw new TransportException(e);
-                }
-            }
-        }
-
-        private int getPort()
-        {
-            return _serverSocket.socket().getLocalPort();
+            _selector = new SelectorThread(config.getAddress().toString(), 
this);
+            _selector.start();
+            _selector.addAcceptingSocket(_serverSocket);
         }
-
-        @Override
-        public void run()
+        catch (IOException e)
         {
-            try
-            {
-                while (!_closed)
-                {
-                    SocketChannel socketChannel = null;
-                    try
-                    {
-                        socketChannel = _serverSocket.accept();
-
-                        acceptSocketChannel(socketChannel);
-                    }
-                    catch(RuntimeException e)
-                    {
-                        LOGGER.error("Error in Acceptor thread on address " + 
_config.getAddress(), e);
-                        closeSocketIfNecessary(socketChannel.socket());
-                    }
-                    catch(IOException e)
-                    {
-                        if(!_closed)
-                        {
-                            LOGGER.error("Error in Acceptor thread on address 
" + _config.getAddress(), e);
-                            closeSocketIfNecessary(socketChannel.socket());
-                            try
-                            {
-                                //Delay to avoid tight spinning the loop 
during issues such as too many open files
-                                Thread.sleep(1000);
-                            }
-                            catch (InterruptedException ie)
-                            {
-                                LOGGER.debug("Stopping acceptor due to 
interrupt request");
-                                _closed = true;
-                            }
-                        }
-                    }
-                }
-            }
-            finally
-            {
-                if(LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Acceptor exiting, no new connections will be 
accepted on address "
-                                 + _config.getAddress());
-                }
-            }
+            throw new TransportException("Failed to start AMQP on port : " + 
config, e);
         }
 
-        public void acceptSocketChannel(final SocketChannel socketChannel) 
throws IOException
-        {
-            final ServerProtocolEngine engine =
-                    (ServerProtocolEngine) 
_factory.newProtocolEngine(socketChannel.socket()
-                                                                              
.getRemoteSocketAddress());
-
-            if(engine != null)
-            {
-                socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, 
_config.getTcpNoDelay());
-                socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
 
-                final Integer sendBufferSize = _config.getSendBufferSize();
-                final Integer receiveBufferSize = 
_config.getReceiveBufferSize();
+    }
 
-                socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 
sendBufferSize);
-                socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
receiveBufferSize);
+    public int getAcceptingPort()
+    {
+        return _serverSocket == null ? -1 : 
_serverSocket.socket().getLocalPort();
+    }
 
+    public void acceptSocketChannel(final SocketChannel socketChannel) throws 
IOException
+    {
+        final ServerProtocolEngine engine =
+                (ServerProtocolEngine) 
_factory.newProtocolEngine(socketChannel.socket()
+                                                                          
.getRemoteSocketAddress());
+
+        if(engine != null)
+        {
+            socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, 
_config.getTcpNoDelay());
+            socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+            final Integer sendBufferSize = _config.getSendBufferSize();
+            final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+            socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 
sendBufferSize);
+            socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
receiveBufferSize);
+
+
+            final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, 
TIMEOUT);
+
+            NonBlockingConnection connection =
+                    new NonBlockingConnection(socketChannel,
+                                              engine,
+                                              sendBufferSize,
+                                              receiveBufferSize,
+                                              _timeout,
+                                              ticker,
+                                              _encryptionSet,
+                                              _sslContext,
+                                              _config.wantClientAuth(),
+                                              _config.needClientAuth(),
+                                              new Runnable()
+                                              {
 
-                final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, 
TIMEOUT);
-
-                NonBlockingConnection connection =
-                        new NonBlockingConnection(socketChannel,
-                                                  engine,
-                                                  sendBufferSize,
-                                                  receiveBufferSize,
-                                                  _timeout,
-                                                  ticker,
-                                                  _encryptionSet,
-                                                  _sslContext,
-                                                  _config.wantClientAuth(),
-                                                  _config.needClientAuth(),
-                                                  new Runnable()
+                                                  @Override
+                                                  public void run()
                                                   {
+                                                      
engine.encryptedTransport();
+                                                  }
+                                              },
+                                              _selector);
 
-                                                      @Override
-                                                      public void run()
-                                                      {
-                                                          
engine.encryptedTransport();
-                                                      }
-                                                  },
-                                                  _selector);
-
-                engine.setNetworkConnection(connection, 
connection.getSender());
-                connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+            engine.setNetworkConnection(connection, connection.getSender());
+            connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
 
-                ticker.setConnection(connection);
+            ticker.setConnection(connection);
 
-                connection.start();
+            connection.start();
 
-                _selector.addConnection(connection);
+            _selector.addConnection(connection);
 
-            }
-            else
-            {
-                socketChannel.close();
-            }
         }
-
-        private void closeSocketIfNecessary(final Socket socket)
+        else
         {
-            if(socket != null)
-            {
-                try
-                {
-                    socket.close();
-                }
-                catch (IOException e)
-                {
-                    LOGGER.debug("Exception while closing socket", e);
-                }
-            }
+            socketChannel.close();
         }
-
     }
 
+
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java?rev=1656365&r1=1656364&r2=1656365&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
 Sun Feb  1 21:45:19 2015
@@ -20,8 +20,11 @@
 package org.apache.qpid.transport.network.io;
 
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -38,16 +41,18 @@ import java.util.concurrent.atomic.Atomi
 */
 public class SelectorThread extends Thread
 {
-
+    private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
     private final Queue<NonBlockingConnection> _unregisteredConnections = new 
ConcurrentLinkedQueue<>();
     private final Set<NonBlockingConnection> _unscheduledConnections = new 
HashSet<>();
     private final Selector _selector;
     private final AtomicBoolean _closed = new AtomicBoolean();
     private final NetworkConnectionScheduler _scheduler = new 
NetworkConnectionScheduler();
+    private final NonBlockingNetworkTransport _transport;
 
-    SelectorThread(final String name)
+    SelectorThread(final String name, final NonBlockingNetworkTransport 
nonBlockingNetworkTransport)
     {
         super("SelectorThread-"+name);
+        _transport = nonBlockingNetworkTransport;
         try
         {
             _selector = Selector.open();
@@ -59,6 +64,45 @@ public class SelectorThread extends Thre
         }
     }
 
+    public void addAcceptingSocket(final ServerSocketChannel socketChannel)
+    {
+        _tasks.add(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+
+                            try
+                            {
+                                socketChannel.register(_selector, 
SelectionKey.OP_ACCEPT);
+                            }
+                            catch (ClosedChannelException e)
+                            {
+                                // TODO
+                                e.printStackTrace();
+                            }
+                        }
+                    });
+        _selector.wakeup();
+    }
+
+    public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+    {
+        _tasks.add(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                SelectionKey selectionKey = socketChannel.keyFor(_selector);
+                if(selectionKey != null)
+                {
+                    selectionKey.cancel();
+                }
+            }
+        });
+        _selector.wakeup();
+    }
+
     @Override
     public void run()
     {
@@ -72,18 +116,33 @@ public class SelectorThread extends Thre
 
                 _selector.select(nextTimeout);
 
+                while(_tasks.peek() != null)
+                {
+                    Runnable task = _tasks.poll();
+                    task.run();
+                }
+
                 List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
 
 
                 Set<SelectionKey> selectionKeys = _selector.selectedKeys();
                 for (SelectionKey key : selectionKeys)
                 {
-                    NonBlockingConnection connection = (NonBlockingConnection) 
key.attachment();
-
-                    key.channel().register(_selector, 0);
-
-                    toBeScheduled.add(connection);
-                    _unscheduledConnections.remove(connection);
+                    if(key.isAcceptable())
+                    {
+                        // todo - should we schedule this rather than running 
in this thread?
+                        SocketChannel acceptedChannel = 
((ServerSocketChannel)key.channel()).accept();
+                        _transport.acceptSocketChannel(acceptedChannel);
+                    }
+                    else
+                    {
+                        NonBlockingConnection connection = 
(NonBlockingConnection) key.attachment();
+
+                        key.channel().register(_selector, 0);
+
+                        toBeScheduled.add(connection);
+                        _unscheduledConnections.remove(connection);
+                    }
 
                 }
                 selectionKeys.clear();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to