Author: rgodfrey Date: Thu Oct 15 21:14:02 2015 New Revision: 1708894 URL: http://svn.apache.org/viewvc?rev=1708894&view=rev Log: QPID-6794 : reduce latency after select
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1708894&r1=1708893&r2=1708894&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Thu Oct 15 21:14:02 2015 @@ -38,13 +38,13 @@ public class NetworkConnectionScheduler { private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class); private final ThreadFactory _factory; - private volatile SelectorThread _selectorThread; private volatile ThreadPoolExecutor _executor; private final AtomicInteger _running = new AtomicInteger(); private final int _poolSizeMinimum; private final int _poolSizeMaximum; private final long _threadKeepAliveTimeout; private final String _name; + private SelectorThread _selectorThread; public NetworkConnectionScheduler(final String name, int threadPoolSizeMinimum, @@ -84,12 +84,15 @@ public class NetworkConnectionScheduler try { _selectorThread = new SelectorThread(this); - _selectorThread.start(); - _executor = new ThreadPoolExecutor(_poolSizeMinimum, _poolSizeMaximum, + _executor = new ThreadPoolExecutor(_poolSizeMaximum, _poolSizeMaximum, _threadKeepAliveTimeout, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), _factory); _executor.prestartAllCoreThreads(); _executor.allowCoreThreadTimeOut(true); + for(int i = 0 ; i < _poolSizeMaximum; i++) + { + _executor.execute(_selectorThread); + } } catch (IOException e) { @@ -97,45 +100,10 @@ public class NetworkConnectionScheduler } } - public void schedule(final NonBlockingConnection connection) - { - if(connection.setScheduled()) - { - _executor.execute(new Runnable() - { - @Override - public void run() - { - String currentName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName( - SelectorThread.IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); - processConnection(connection); - } - finally - { - Thread.currentThread().setName(currentName); - } - } - }); - - increaseCorePoolPoolSizeIfNecessary(); - } - } - - private void increaseCorePoolPoolSizeIfNecessary() - { - int currentPoolSize; - while((currentPoolSize = _executor.getCorePoolSize()) < _poolSizeMaximum && !_executor.getQueue().isEmpty()) - { - // Currently we do not shrink the core pool size back to its original value - _executor.setCorePoolSize(currentPoolSize + 1); - } - } - - private void processConnection(final NonBlockingConnection connection) + void processConnection(final NonBlockingConnection connection) { + Thread.currentThread().setName( + SelectorThread.IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); try { _running.incrementAndGet(); @@ -165,7 +133,8 @@ public class NetworkConnectionScheduler connection.clearScheduled(); if(connection.isStateChanged()) { - schedule(connection); + connection.clearScheduled(); + _selectorThread.addToWork(connection); } else { @@ -186,6 +155,7 @@ public class NetworkConnectionScheduler { _running.decrementAndGet(); } + } public void close() @@ -231,4 +201,14 @@ public class NetworkConnectionScheduler { _selectorThread.removeConnection(connection); } + + int getPoolSizeMaximum() + { + return _poolSizeMaximum; + } + + public void schedule(final NonBlockingConnection connection) + { + _selectorThread.addToWork(connection); + } } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1708894&r1=1708893&r2=1708894&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Oct 15 21:14:02 2015 @@ -25,15 +25,16 @@ import java.net.SocketAddress; import java.nio.channels.SocketChannel; import java.security.Principal; import java.security.cert.Certificate; -import java.util.*; +import java.util.Collection; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.qpid.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.model.port.AmqpPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.ByteBufferSender; @@ -492,4 +493,5 @@ public class NonBlockingConnection imple _unexpectedByteBufferSizeUsed = true; } } + } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1708894&r1=1708893&r2=1708894&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Oct 15 21:14:02 2015 @@ -31,7 +31,9 @@ import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; @@ -56,37 +58,47 @@ class SelectorThread extends Thread private final Selector _selector; private final AtomicBoolean _closed = new AtomicBoolean(); + private final AtomicBoolean _selecting = new AtomicBoolean(); private final NetworkConnectionScheduler _scheduler; private long _nextTimeout; - SelectorThread(final NetworkConnectionScheduler scheduler) throws IOException + private final BlockingQueue<Runnable> _workQueue = new LinkedBlockingQueue<>(); + + private Runnable _selectionTask = new Runnable() { - super("Selector-" + scheduler.getName()); + @Override + public void run() + { + performSelect(); + } + }; + SelectorThread(final NetworkConnectionScheduler scheduler) throws IOException + { _selector = Selector.open(); _scheduler = scheduler; + _workQueue.add(_selectionTask); } public void addAcceptingSocket(final ServerSocketChannel socketChannel, final NonBlockingNetworkTransport nonBlockingNetworkTransport) { _tasks.add(new Runnable() - { - @Override - public void run() - { + { + @Override + public void run() + { - try - { - socketChannel.register(_selector, SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport); - } - catch (IllegalStateException | ClosedChannelException e) - { - // TODO Communicate condition back to model object to make it go into the ERROR state - LOGGER.error("Failed to register selector on accepting port", e); - } - } - }); + try + { + socketChannel.register(_selector, SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport); + } + catch (IllegalStateException | ClosedChannelException e) + { + // TODO Communicate condition back to model object to make it go into the ERROR state + LOGGER.error("Failed to register selector on accepting port", e); + } } + }); _selector.wakeup(); } @@ -98,7 +110,7 @@ class SelectorThread extends Thread public void run() { SelectionKey selectionKey = socketChannel.keyFor(_selector); - if(selectionKey != null) + if (selectionKey != null) { selectionKey.cancel(); } @@ -107,90 +119,173 @@ class SelectorThread extends Thread _selector.wakeup(); } - @Override - public void run() + private List<NonBlockingConnection> processUnscheduledConnections() { + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - _nextTimeout = 0; - - try + long currentTime = System.currentTimeMillis(); + Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); + _nextTimeout = Integer.MAX_VALUE; + while (iterator.hasNext()) { - while (!_closed.get()) - { + NonBlockingConnection connection = iterator.next(); + int period = connection.getTicker().getTimeToNextTick(currentTime); + + if (period <= 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); try { - _selector.select(_nextTimeout); + unregisterConnection(connection); } - catch (IOException e) + catch (ClosedChannelException e) { - // TODO Inform the model object - LOGGER.error("Failed to trying to select()",e ); - break; + LOGGER.debug("Failed to register with selector for connection " + connection + + ". Connection is probably being closed by peer.", e); } + iterator.remove(); + } + else + { + _nextTimeout = Math.min(period, _nextTimeout); + } + } - runTasks(); - - List<NonBlockingConnection> toBeScheduled = processSelectionKeys(); + return toBeScheduled; + } - toBeScheduled.addAll(reregisterUnregisteredConnections()); + @Override + public void run() + { - toBeScheduled.addAll(processUnscheduledConnections()); + final String name = Thread.currentThread().getName(); + try + { + do + { + Thread.currentThread().setName(name); + Runnable task = _workQueue.take(); + task.run(); - for (NonBlockingConnection connection : toBeScheduled) - { - _scheduler.schedule(connection); - } - } + } while (!_closed.get()); } - finally + catch (InterruptedException e) { - try + Thread.currentThread().interrupt(); + } + + } + + private void closeSelector() + { + try + { + if(_selector.isOpen()) { _selector.close(); } - catch (IOException e) - { - LOGGER.debug("Failed to close selector", e); - } } + catch (IOException e) + { + LOGGER.debug("Failed to close selector", e); + } } - private List<NonBlockingConnection> processUnscheduledConnections() + private static final class ConnectionProcessor implements Runnable { - List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - long currentTime = System.currentTimeMillis(); - Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); - _nextTimeout = Integer.MAX_VALUE; - while (iterator.hasNext()) + private final NetworkConnectionScheduler _scheduler; + private final NonBlockingConnection _connection; + private AtomicBoolean _running = new AtomicBoolean(); + + public ConnectionProcessor(final NetworkConnectionScheduler scheduler, final NonBlockingConnection connection) { - NonBlockingConnection connection = iterator.next(); + _scheduler = scheduler; + _connection = connection; + } - int period = connection.getTicker().getTimeToNextTick(currentTime); + @Override + public void run() + { + if(_running.compareAndSet(false,true)) + { + _scheduler.processConnection(_connection); + } + } + } - if (period <= 0 || connection.isStateChanged()) + private void performSelect() + { + while(!_closed.get()) + { + if(_selecting.compareAndSet(false,true)) { - toBeScheduled.add(connection); + List<ConnectionProcessor> connections = new ArrayList<>(); try { - unregisterConnection(connection); + if (!_closed.get()) + { + Thread.currentThread().setName("Selector-" + _scheduler.getName()); + try + { + _selector.select(_nextTimeout); + } + catch (IOException e) + { + // TODO Inform the model object + LOGGER.error("Failed to trying to select()", e); + closeSelector(); + return; + } + runTasks(); + for (NonBlockingConnection connection : processSelectionKeys()) + { + if(connection.setScheduled()) + { + connections.add(new ConnectionProcessor(_scheduler, connection)); + } + } + for (NonBlockingConnection connection : reregisterUnregisteredConnections()) + { + if(connection.setScheduled()) + { + connections.add(new ConnectionProcessor(_scheduler, connection)); + } + } + for (NonBlockingConnection connection : processUnscheduledConnections()) + { + if(connection.setScheduled()) + { + connections.add(new ConnectionProcessor(_scheduler, connection)); + } + } + + } } - catch (ClosedChannelException e) + finally { - LOGGER.debug("Failed to register with selector for connection " + connection + - ". Connection is probably being closed by peer.", e); + _selecting.set(false); } - iterator.remove(); + _workQueue.add(_selectionTask); + _workQueue.addAll(connections); + for(ConnectionProcessor connectionProcessor : connections) + { + connectionProcessor.run(); + } + } else { - _nextTimeout = Math.min(period, _nextTimeout); + break; } } - return toBeScheduled; + if(_closed.get() && _selecting.compareAndSet(false,true)) + { + closeSelector(); + } } private void unregisterConnection(final NonBlockingConnection connection) throws ClosedChannelException @@ -318,8 +413,32 @@ class SelectorThread extends Thread public void close() { + Runnable goodNight = new Runnable() + { + @Override + public void run() + { + // Make sure take awakes so it can observe _closed + } + }; _closed.set(true); + + int count = _scheduler.getPoolSizeMaximum(); + while(count-- > 0) + { + _workQueue.offer(goodNight); + } + _selector.wakeup(); + } + public void addToWork(final NonBlockingConnection connection) + { + if(connection.setScheduled()) + { + _workQueue.add(new ConnectionProcessor(_scheduler, connection)); + } + + } } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java?rev=1708894&r1=1708893&r2=1708894&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java Thu Oct 15 21:14:02 2015 @@ -46,6 +46,7 @@ public class BrokerAdapterTest extends Q private TaskExecutorImpl _taskExecutor; private SystemConfig _systemConfig; + private BrokerAdapter _brokerAdapter; @Override public void setUp() throws Exception @@ -66,12 +67,18 @@ public class BrokerAdapterTest extends Q @Override public void tearDown() throws Exception { - try + try { + if (_brokerAdapter != null) + { + _brokerAdapter.close(); + } + if (_taskExecutor != null) { _taskExecutor.stopImmediately(); } + } finally { @@ -112,11 +119,14 @@ public class BrokerAdapterTest extends Q // testing unsuccessful case attributes.put(Broker.CONTEXT, Collections.singletonMap(Broker.NETWORK_BUFFER_SIZE, Broker.MINIMUM_NETWORK_BUFFER_SIZE - 1)); - BrokerAdapter brokerAdapter = new BrokerAdapter(attributes, _systemConfig); - brokerAdapter.open(); - assertEquals("Broker open should fail with network buffer size less then minimum", State.ERRORED, brokerAdapter.getState()); - assertEquals("Unexpected buffer size", Broker.DEFAULT_NETWORK_BUFFER_SIZE, brokerAdapter.getNetworkBufferSize()); - brokerAdapter.close(); + _brokerAdapter = new BrokerAdapter(attributes, _systemConfig); + _brokerAdapter.open(); + assertEquals("Broker open should fail with network buffer size less then minimum", + State.ERRORED, + _brokerAdapter.getState()); + assertEquals("Unexpected buffer size", + Broker.DEFAULT_NETWORK_BUFFER_SIZE, + _brokerAdapter.getNetworkBufferSize()); } private void doAssignTargetSizeTest(final long[] virtualHostQueueSizes, final long flowToDiskThreshold) @@ -124,16 +134,16 @@ public class BrokerAdapterTest extends Q Map<String, Object> attributes = new HashMap<>(); attributes.put("name", "Broker"); attributes.put("context", Collections.singletonMap(Broker.BROKER_FLOW_TO_DISK_THRESHOLD, flowToDiskThreshold)); - BrokerAdapter brokerAdapter = new BrokerAdapter(attributes, _systemConfig); - brokerAdapter.open(); + _brokerAdapter = new BrokerAdapter(attributes, _systemConfig); + _brokerAdapter.open(); for(int i=0; i < virtualHostQueueSizes.length; i++) { - createVhnWithVh(brokerAdapter, i, virtualHostQueueSizes[i]); + createVhnWithVh(_brokerAdapter, i, virtualHostQueueSizes[i]); } long totalAssignedTargetSize = 0; - for(VirtualHostNode<?> vhn : brokerAdapter.getVirtualHostNodes()) + for(VirtualHostNode<?> vhn : _brokerAdapter.getVirtualHostNodes()) { long targetSize = vhn.getVirtualHost().getTargetSize(); assertTrue("A virtualhost's target size cannot be zero", targetSize > 0); @@ -141,7 +151,7 @@ public class BrokerAdapterTest extends Q } long diff = Math.abs(flowToDiskThreshold - totalAssignedTargetSize); - long tolerance = brokerAdapter.getVirtualHostNodes().size() * 2; + long tolerance = _brokerAdapter.getVirtualHostNodes().size() * 2; assertTrue(String.format("Assigned target size not within expected tolerance. Diff %d Tolerance %d", diff, tolerance), diff < tolerance); } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1708894&r1=1708893&r2=1708894&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Thu Oct 15 21:14:02 2015 @@ -19,11 +19,11 @@ package org.apache.qpid.server.transport; -import org.apache.qpid.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.AggregateTicker; -import org.apache.qpid.transport.network.TransportEncryption; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.io.OutputStream; @@ -33,7 +33,11 @@ import java.net.SocketAddress; import java.util.Collections; import java.util.EnumSet; -import static org.mockito.Mockito.*; +import org.apache.qpid.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.network.AggregateTicker; +import org.apache.qpid.transport.network.TransportEncryption; public class NetworkConnectionSchedulerTest extends QpidTestCase { @@ -57,7 +61,7 @@ public class NetworkConnectionSchedulerT when(verboseEngine.getAggregateTicker()).thenReturn(new AggregateTicker()); when(timidEngine.getAggregateTicker()).thenReturn(new AggregateTicker()); - final NetworkConnectionScheduler scheduler = new NetworkConnectionScheduler(getName(), 1, 1, 1000); + final NetworkConnectionScheduler scheduler = new NetworkConnectionScheduler(getName(), 2, 2, 1000); scheduler.start(); NonBlockingNetworkTransport transport = new NonBlockingNetworkTransport(engineFactory, EnumSet.of(TransportEncryption.NONE), scheduler, port); Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1708894&r1=1708893&r2=1708894&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java Thu Oct 15 21:14:02 2015 @@ -108,8 +108,8 @@ public class TCPandSSLTransportTest exte when(port.getPort()).thenReturn(0); when(port.getNetworkBufferSize()).thenReturn(64*1024); when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true); - when(port.getThreadPoolMinimum()).thenReturn(1); - when(port.getThreadPoolMaximum()).thenReturn(1); + when(port.getThreadPoolMinimum()).thenReturn(2); + when(port.getThreadPoolMaximum()).thenReturn(2); when(port.getSSLContext()).thenReturn(sslContext); when(port.getContextValue(Long.class, AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l); Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java?rev=1708894&r1=1708893&r2=1708894&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java Thu Oct 15 21:14:02 2015 @@ -88,6 +88,10 @@ public class VirtualHostAliasTest extend protected void tearDown() throws Exception { _port.close(); + for (VirtualHost vhost : _vhosts.values()) + { + vhost.close(); + } super.tearDown(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org