Author: rgodfrey
Date: Thu Oct 15 21:14:02 2015
New Revision: 1708894

URL: http://svn.apache.org/viewvc?rev=1708894&view=rev
Log:
QPID-6794 : reduce latency after select

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1708894&r1=1708893&r2=1708894&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
 Thu Oct 15 21:14:02 2015
@@ -38,13 +38,13 @@ public class NetworkConnectionScheduler
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(NetworkConnectionScheduler.class);
     private final ThreadFactory _factory;
-    private volatile SelectorThread _selectorThread;
     private volatile ThreadPoolExecutor _executor;
     private final AtomicInteger _running = new AtomicInteger();
     private final int _poolSizeMinimum;
     private final int _poolSizeMaximum;
     private final long _threadKeepAliveTimeout;
     private final String _name;
+    private SelectorThread _selectorThread;
 
     public NetworkConnectionScheduler(final String name,
                                       int threadPoolSizeMinimum,
@@ -84,12 +84,15 @@ public class NetworkConnectionScheduler
         try
         {
             _selectorThread = new SelectorThread(this);
-            _selectorThread.start();
-            _executor = new ThreadPoolExecutor(_poolSizeMinimum, 
_poolSizeMaximum,
+            _executor = new ThreadPoolExecutor(_poolSizeMaximum, 
_poolSizeMaximum,
                                                _threadKeepAliveTimeout, 
TimeUnit.MINUTES,
                                                new 
LinkedBlockingQueue<Runnable>(), _factory);
             _executor.prestartAllCoreThreads();
             _executor.allowCoreThreadTimeOut(true);
+            for(int i = 0 ; i < _poolSizeMaximum; i++)
+            {
+                _executor.execute(_selectorThread);
+            }
         }
         catch (IOException e)
         {
@@ -97,45 +100,10 @@ public class NetworkConnectionScheduler
         }
     }
 
-    public void schedule(final NonBlockingConnection connection)
-    {
-        if(connection.setScheduled())
-        {
-            _executor.execute(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    String currentName = Thread.currentThread().getName();
-                    try
-                    {
-                        Thread.currentThread().setName(
-                                SelectorThread.IO_THREAD_NAME_PREFIX + 
connection.getRemoteAddress().toString());
-                        processConnection(connection);
-                    }
-                    finally
-                    {
-                        Thread.currentThread().setName(currentName);
-                    }
-                }
-            });
-
-            increaseCorePoolPoolSizeIfNecessary();
-        }
-    }
-
-    private void increaseCorePoolPoolSizeIfNecessary()
-    {
-        int currentPoolSize;
-        while((currentPoolSize = _executor.getCorePoolSize()) < 
_poolSizeMaximum && !_executor.getQueue().isEmpty())
-        {
-            // Currently we do not shrink the core pool size back to its 
original value
-            _executor.setCorePoolSize(currentPoolSize + 1);
-        }
-    }
-
-    private void processConnection(final NonBlockingConnection connection)
+    void processConnection(final NonBlockingConnection connection)
     {
+        Thread.currentThread().setName(
+                SelectorThread.IO_THREAD_NAME_PREFIX + 
connection.getRemoteAddress().toString());
         try
         {
             _running.incrementAndGet();
@@ -165,7 +133,8 @@ public class NetworkConnectionScheduler
                         connection.clearScheduled();
                         if(connection.isStateChanged())
                         {
-                            schedule(connection);
+                            connection.clearScheduled();
+                            _selectorThread.addToWork(connection);
                         }
                         else
                         {
@@ -186,6 +155,7 @@ public class NetworkConnectionScheduler
         {
             _running.decrementAndGet();
         }
+
     }
 
     public void close()
@@ -231,4 +201,14 @@ public class NetworkConnectionScheduler
     {
         _selectorThread.removeConnection(connection);
     }
+
+    int getPoolSizeMaximum()
+    {
+        return _poolSizeMaximum;
+    }
+
+    public void schedule(final NonBlockingConnection connection)
+    {
+        _selectorThread.addToWork(connection);
+    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1708894&r1=1708893&r2=1708894&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 Thu Oct 15 21:14:02 2015
@@ -25,15 +25,16 @@ import java.net.SocketAddress;
 import java.nio.channels.SocketChannel;
 import java.security.Principal;
 import java.security.cert.Certificate;
-import java.util.*;
+import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.port.AmqpPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
@@ -492,4 +493,5 @@ public class NonBlockingConnection imple
             _unexpectedByteBufferSizeUsed = true;
         }
     }
+
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1708894&r1=1708893&r2=1708894&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 Thu Oct 15 21:14:02 2015
@@ -31,7 +31,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
@@ -56,37 +58,47 @@ class SelectorThread extends Thread
 
     private final Selector _selector;
     private final AtomicBoolean _closed = new AtomicBoolean();
+    private final AtomicBoolean _selecting = new AtomicBoolean();
     private final NetworkConnectionScheduler _scheduler;
     private long _nextTimeout;
 
-    SelectorThread(final NetworkConnectionScheduler scheduler) throws 
IOException
+    private final BlockingQueue<Runnable> _workQueue = new 
LinkedBlockingQueue<>();
+
+    private Runnable _selectionTask = new Runnable()
     {
-        super("Selector-" + scheduler.getName());
+        @Override
+        public void run()
+        {
+            performSelect();
+        }
+    };
 
+    SelectorThread(final NetworkConnectionScheduler scheduler) throws 
IOException
+    {
         _selector = Selector.open();
         _scheduler = scheduler;
+        _workQueue.add(_selectionTask);
     }
 
     public void addAcceptingSocket(final ServerSocketChannel socketChannel,
                                    final NonBlockingNetworkTransport 
nonBlockingNetworkTransport)
     {
         _tasks.add(new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
+        {
+            @Override
+            public void run()
+            {
 
-                            try
-                            {
-                                socketChannel.register(_selector, 
SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport);
-                            }
-                            catch (IllegalStateException | 
ClosedChannelException e)
-                            {
-                                // TODO Communicate condition back to model 
object to make it go into the ERROR state
-                                LOGGER.error("Failed to register selector on 
accepting port", e);
-                            }
-                        }
-                    });
+                try
+                {
+                    socketChannel.register(_selector, SelectionKey.OP_ACCEPT, 
nonBlockingNetworkTransport);
+                }
+                catch (IllegalStateException | ClosedChannelException e)
+                {
+                    // TODO Communicate condition back to model object to make 
it go into the ERROR state
+                    LOGGER.error("Failed to register selector on accepting 
port", e);
+                }             }
+        });
         _selector.wakeup();
     }
 
@@ -98,7 +110,7 @@ class SelectorThread extends Thread
             public void run()
             {
                 SelectionKey selectionKey = socketChannel.keyFor(_selector);
-                if(selectionKey != null)
+                if (selectionKey != null)
                 {
                     selectionKey.cancel();
                 }
@@ -107,90 +119,173 @@ class SelectorThread extends Thread
         _selector.wakeup();
     }
 
-    @Override
-    public void run()
+    private List<NonBlockingConnection> processUnscheduledConnections()
     {
+        List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
 
-        _nextTimeout = 0;
-
-        try
+        long currentTime = System.currentTimeMillis();
+        Iterator<NonBlockingConnection> iterator = 
_unscheduledConnections.iterator();
+        _nextTimeout = Integer.MAX_VALUE;
+        while (iterator.hasNext())
         {
-            while (!_closed.get())
-            {
+            NonBlockingConnection connection = iterator.next();
 
+            int period = connection.getTicker().getTimeToNextTick(currentTime);
+
+            if (period <= 0 || connection.isStateChanged())
+            {
+                toBeScheduled.add(connection);
                 try
                 {
-                    _selector.select(_nextTimeout);
+                    unregisterConnection(connection);
                 }
-                catch (IOException e)
+                catch (ClosedChannelException e)
                 {
-                    // TODO Inform the model object
-                    LOGGER.error("Failed to trying to select()",e );
-                    break;
+                    LOGGER.debug("Failed to register with selector for 
connection " + connection +
+                                 ". Connection is probably being closed by 
peer.", e);
                 }
+                iterator.remove();
+            }
+            else
+            {
+                _nextTimeout = Math.min(period, _nextTimeout);
+            }
+        }
 
-                runTasks();
-
-                List<NonBlockingConnection> toBeScheduled = 
processSelectionKeys();
+        return toBeScheduled;
+    }
 
-                toBeScheduled.addAll(reregisterUnregisteredConnections());
+    @Override
+    public void run()
+    {
 
-                toBeScheduled.addAll(processUnscheduledConnections());
+        final String name = Thread.currentThread().getName();
+        try
+        {
+            do
+            {
+                Thread.currentThread().setName(name);
+                Runnable task = _workQueue.take();
+                task.run();
 
-                for (NonBlockingConnection connection : toBeScheduled)
-                {
-                    _scheduler.schedule(connection);
-                }
-            }
+            } while (!_closed.get());
         }
-        finally
+        catch (InterruptedException e)
         {
-            try
+            Thread.currentThread().interrupt();
+        }
+
+    }
+
+    private void closeSelector()
+    {
+        try
+        {
+            if(_selector.isOpen())
             {
                 _selector.close();
             }
-            catch (IOException e)
-            {
-                LOGGER.debug("Failed to close selector", e);
-            }
         }
+        catch (IOException e)
 
+        {
+            LOGGER.debug("Failed to close selector", e);
+        }
     }
 
-    private List<NonBlockingConnection> processUnscheduledConnections()
+    private static final class ConnectionProcessor implements Runnable
     {
-        List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
 
-        long currentTime = System.currentTimeMillis();
-        Iterator<NonBlockingConnection> iterator = 
_unscheduledConnections.iterator();
-        _nextTimeout = Integer.MAX_VALUE;
-        while (iterator.hasNext())
+        private final NetworkConnectionScheduler _scheduler;
+        private final NonBlockingConnection _connection;
+        private AtomicBoolean _running = new AtomicBoolean();
+
+        public ConnectionProcessor(final NetworkConnectionScheduler scheduler, 
final NonBlockingConnection connection)
         {
-            NonBlockingConnection connection = iterator.next();
+            _scheduler = scheduler;
+            _connection = connection;
+        }
 
-            int period = connection.getTicker().getTimeToNextTick(currentTime);
+        @Override
+        public void run()
+        {
+            if(_running.compareAndSet(false,true))
+            {
+                _scheduler.processConnection(_connection);
+            }
+        }
+    }
 
-            if (period <= 0 || connection.isStateChanged())
+    private void performSelect()
+    {
+        while(!_closed.get())
+        {
+            if(_selecting.compareAndSet(false,true))
             {
-                toBeScheduled.add(connection);
+                List<ConnectionProcessor> connections = new ArrayList<>();
                 try
                 {
-                    unregisterConnection(connection);
+                    if (!_closed.get())
+                    {
+                        Thread.currentThread().setName("Selector-" + 
_scheduler.getName());
+                        try
+                        {
+                             _selector.select(_nextTimeout);
+                        }
+                        catch (IOException e)
+                        {
+                            // TODO Inform the model object
+                            LOGGER.error("Failed to trying to select()", e);
+                            closeSelector();
+                            return;
+                        }
+                        runTasks();
+                        for (NonBlockingConnection connection : 
processSelectionKeys())
+                        {
+                            if(connection.setScheduled())
+                            {
+                                connections.add(new 
ConnectionProcessor(_scheduler, connection));
+                            }
+                        }
+                        for (NonBlockingConnection connection : 
reregisterUnregisteredConnections())
+                        {
+                            if(connection.setScheduled())
+                            {
+                                connections.add(new 
ConnectionProcessor(_scheduler, connection));
+                            }
+                        }
+                        for (NonBlockingConnection connection : 
processUnscheduledConnections())
+                        {
+                            if(connection.setScheduled())
+                            {
+                                connections.add(new 
ConnectionProcessor(_scheduler, connection));
+                            }
+                        }
+
+                    }
                 }
-                catch (ClosedChannelException e)
+                finally
                 {
-                    LOGGER.debug("Failed to register with selector for 
connection " + connection +
-                                 ". Connection is probably being closed by 
peer.", e);
+                    _selecting.set(false);
                 }
-                iterator.remove();
+                _workQueue.add(_selectionTask);
+                _workQueue.addAll(connections);
+                for(ConnectionProcessor connectionProcessor : connections)
+                {
+                    connectionProcessor.run();
+                }
+
             }
             else
             {
-                _nextTimeout = Math.min(period, _nextTimeout);
+                break;
             }
         }
 
-        return toBeScheduled;
+        if(_closed.get() && _selecting.compareAndSet(false,true))
+        {
+            closeSelector();
+        }
     }
 
     private void unregisterConnection(final NonBlockingConnection connection) 
throws ClosedChannelException
@@ -318,8 +413,32 @@ class SelectorThread extends Thread
 
     public void close()
     {
+        Runnable goodNight = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                // Make sure take awakes so it can observe _closed
+            }
+        };
         _closed.set(true);
+
+        int count = _scheduler.getPoolSizeMaximum();
+        while(count-- > 0)
+        {
+            _workQueue.offer(goodNight);
+        }
+
         _selector.wakeup();
+
     }
 
+     public void addToWork(final NonBlockingConnection connection)
+     {
+         if(connection.setScheduled())
+         {
+             _workQueue.add(new ConnectionProcessor(_scheduler, connection));
+         }
+
+     }
 }

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java?rev=1708894&r1=1708893&r2=1708894&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/adapter/BrokerAdapterTest.java
 Thu Oct 15 21:14:02 2015
@@ -46,6 +46,7 @@ public class BrokerAdapterTest extends Q
 
     private TaskExecutorImpl _taskExecutor;
     private SystemConfig _systemConfig;
+    private BrokerAdapter _brokerAdapter;
 
     @Override
     public void setUp() throws Exception
@@ -66,12 +67,18 @@ public class BrokerAdapterTest extends Q
     @Override
     public void tearDown() throws Exception
     {
-       try
+        try
         {
+            if (_brokerAdapter != null)
+            {
+                _brokerAdapter.close();
+            }
+
             if (_taskExecutor != null)
             {
                 _taskExecutor.stopImmediately();
             }
+
         }
         finally
         {
@@ -112,11 +119,14 @@ public class BrokerAdapterTest extends Q
 
         // testing unsuccessful case
         attributes.put(Broker.CONTEXT, 
Collections.singletonMap(Broker.NETWORK_BUFFER_SIZE, 
Broker.MINIMUM_NETWORK_BUFFER_SIZE - 1));
-        BrokerAdapter brokerAdapter = new BrokerAdapter(attributes, 
_systemConfig);
-        brokerAdapter.open();
-        assertEquals("Broker open should fail with network buffer size less 
then minimum", State.ERRORED, brokerAdapter.getState());
-        assertEquals("Unexpected buffer size", 
Broker.DEFAULT_NETWORK_BUFFER_SIZE, brokerAdapter.getNetworkBufferSize());
-        brokerAdapter.close();
+        _brokerAdapter = new BrokerAdapter(attributes, _systemConfig);
+        _brokerAdapter.open();
+        assertEquals("Broker open should fail with network buffer size less 
then minimum",
+                     State.ERRORED,
+                     _brokerAdapter.getState());
+        assertEquals("Unexpected buffer size",
+                     Broker.DEFAULT_NETWORK_BUFFER_SIZE,
+                     _brokerAdapter.getNetworkBufferSize());
     }
 
     private void doAssignTargetSizeTest(final long[] virtualHostQueueSizes, 
final long flowToDiskThreshold)
@@ -124,16 +134,16 @@ public class BrokerAdapterTest extends Q
         Map<String, Object> attributes = new HashMap<>();
         attributes.put("name", "Broker");
         attributes.put("context", 
Collections.singletonMap(Broker.BROKER_FLOW_TO_DISK_THRESHOLD, 
flowToDiskThreshold));
-        BrokerAdapter brokerAdapter = new BrokerAdapter(attributes, 
_systemConfig);
-        brokerAdapter.open();
+        _brokerAdapter = new BrokerAdapter(attributes, _systemConfig);
+        _brokerAdapter.open();
 
         for(int i=0; i < virtualHostQueueSizes.length; i++)
         {
-            createVhnWithVh(brokerAdapter, i, virtualHostQueueSizes[i]);
+            createVhnWithVh(_brokerAdapter, i, virtualHostQueueSizes[i]);
         }
 
         long totalAssignedTargetSize = 0;
-        for(VirtualHostNode<?> vhn : brokerAdapter.getVirtualHostNodes())
+        for(VirtualHostNode<?> vhn : _brokerAdapter.getVirtualHostNodes())
         {
             long targetSize = vhn.getVirtualHost().getTargetSize();
             assertTrue("A virtualhost's target size cannot be zero", 
targetSize > 0);
@@ -141,7 +151,7 @@ public class BrokerAdapterTest extends Q
         }
 
         long diff = Math.abs(flowToDiskThreshold - totalAssignedTargetSize);
-        long tolerance = brokerAdapter.getVirtualHostNodes().size() * 2;
+        long tolerance = _brokerAdapter.getVirtualHostNodes().size() * 2;
         assertTrue(String.format("Assigned target size not within expected 
tolerance. Diff %d Tolerance %d", diff, tolerance), diff < tolerance);
     }
 

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1708894&r1=1708893&r2=1708894&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
 Thu Oct 15 21:14:02 2015
@@ -19,11 +19,11 @@
 
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.TransportEncryption;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -33,7 +33,11 @@ import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.EnumSet;
 
-import static org.mockito.Mockito.*;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.TransportEncryption;
 
 public class NetworkConnectionSchedulerTest extends QpidTestCase
 {
@@ -57,7 +61,7 @@ public class NetworkConnectionSchedulerT
         when(verboseEngine.getAggregateTicker()).thenReturn(new 
AggregateTicker());
         when(timidEngine.getAggregateTicker()).thenReturn(new 
AggregateTicker());
 
-        final NetworkConnectionScheduler scheduler = new 
NetworkConnectionScheduler(getName(), 1, 1, 1000);
+        final NetworkConnectionScheduler scheduler = new 
NetworkConnectionScheduler(getName(), 2, 2, 1000);
         scheduler.start();
         NonBlockingNetworkTransport transport = new 
NonBlockingNetworkTransport(engineFactory, EnumSet.of(TransportEncryption.NONE),
                                                                                
 scheduler, port);

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1708894&r1=1708893&r2=1708894&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 Thu Oct 15 21:14:02 2015
@@ -108,8 +108,8 @@ public class TCPandSSLTransportTest exte
         when(port.getPort()).thenReturn(0);
         when(port.getNetworkBufferSize()).thenReturn(64*1024);
         
when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
-        when(port.getThreadPoolMinimum()).thenReturn(1);
-        when(port.getThreadPoolMaximum()).thenReturn(1);
+        when(port.getThreadPoolMinimum()).thenReturn(2);
+        when(port.getThreadPoolMaximum()).thenReturn(2);
         when(port.getSSLContext()).thenReturn(sslContext);
         when(port.getContextValue(Long.class, 
AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
 

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java?rev=1708894&r1=1708893&r2=1708894&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhostalias/VirtualHostAliasTest.java
 Thu Oct 15 21:14:02 2015
@@ -88,6 +88,10 @@ public class VirtualHostAliasTest extend
     protected void tearDown() throws Exception
     {
         _port.close();
+        for (VirtualHost vhost : _vhosts.values())
+        {
+            vhost.close();
+        }
         super.tearDown();
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to