Author: kwall
Date: Wed Mar 11 15:55:25 2015
New Revision: 1665911

URL: http://svn.apache.org/r1665911
Log:
Bug fix: Delay shutting download the Port's executor until the port has no 
remaining connections

Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1665911&r1=1665910&r2=1665911&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
 Wed Mar 11 15:55:25 2015
@@ -644,7 +644,7 @@ public abstract class AbstractConfigured
             @Override
             public ListenableFuture<Void> call() throws Exception
             {
-                LOGGER.debug("Closing " + getClass().getSimpleName() + " : " + 
getName());
+                LOGGER.debug("Closing " + 
AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
 
                 if(_dynamicState.compareAndSet(DynamicState.OPENED, 
DynamicState.CLOSED))
                 {
@@ -669,7 +669,7 @@ public abstract class AbstractConfigured
                 }
                 else
                 {
-                    LOGGER.debug("Closed " + getClass().getSimpleName() + " : 
" + getName());
+                    LOGGER.debug("Closed " + 
AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
 
                     return Futures.immediateFuture(null);
                 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1665911&r1=1665910&r2=1665911&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
 Wed Mar 11 15:55:25 2015
@@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -118,6 +120,8 @@ public class AmqpPortImpl extends Abstra
 
     private final Broker<?> _broker;
     private AcceptingTransport _transport;
+    private final AtomicBoolean _closing = new AtomicBoolean();
+    private final SettableFuture _noConnectionsRemain = 
SettableFuture.create();
 
     @ManagedObjectFactoryConstructor
     public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -254,6 +258,19 @@ public class AmqpPortImpl extends Abstra
     }
 
     @Override
+    protected ListenableFuture<Void> beforeClose()
+    {
+        _closing.set(true);
+
+        if (_connectionCount.get() == 0)
+        {
+            _noConnectionsRemain.set(null);
+        }
+
+        return _noConnectionsRemain;
+    }
+
+    @Override
     protected void onClose()
     {
         if (_transport != null)
@@ -262,6 +279,8 @@ public class AmqpPortImpl extends Abstra
             {
                 
_broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport),
 getPort()));
             }
+
+
             _transport.close();
         }
     }
@@ -500,6 +519,11 @@ public class AmqpPortImpl extends Abstra
            _connectionCountWarningGiven.compareAndSet(true,false);
         }
 
+        if (_closing.get() && _connectionCount.get() == 0)
+        {
+            _noConnectionsRemain.set(null);
+        }
+
         return openConnections;
     }
 
@@ -511,7 +535,7 @@ public class AmqpPortImpl extends Abstra
     @Override
     public boolean canAcceptNewConnection(final SocketAddress 
remoteSocketAddress)
     {
-        return _maxOpenConnections < 0 || _connectionCount.get() < 
_maxOpenConnections;
+        return !_closing.get() && ( _maxOpenConnections < 0 || 
_connectionCount.get() < _maxOpenConnections );
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to