Author: kwall
Date: Wed Mar 11 15:55:25 2015
New Revision: 1665911
URL: http://svn.apache.org/r1665911
Log:
Bug fix: Delay shutting download the Port's executor until the port has no
remaining connections
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1665911&r1=1665910&r2=1665911&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
Wed Mar 11 15:55:25 2015
@@ -644,7 +644,7 @@ public abstract class AbstractConfigured
@Override
public ListenableFuture<Void> call() throws Exception
{
- LOGGER.debug("Closing " + getClass().getSimpleName() + " : " +
getName());
+ LOGGER.debug("Closing " +
AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
if(_dynamicState.compareAndSet(DynamicState.OPENED,
DynamicState.CLOSED))
{
@@ -669,7 +669,7 @@ public abstract class AbstractConfigured
}
else
{
- LOGGER.debug("Closed " + getClass().getSimpleName() + " :
" + getName());
+ LOGGER.debug("Closed " +
AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
return Futures.immediateFuture(null);
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1665911&r1=1665910&r2=1665911&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
Wed Mar 11 15:55:25 2015
@@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -118,6 +120,8 @@ public class AmqpPortImpl extends Abstra
private final Broker<?> _broker;
private AcceptingTransport _transport;
+ private final AtomicBoolean _closing = new AtomicBoolean();
+ private final SettableFuture _noConnectionsRemain =
SettableFuture.create();
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -254,6 +258,19 @@ public class AmqpPortImpl extends Abstra
}
@Override
+ protected ListenableFuture<Void> beforeClose()
+ {
+ _closing.set(true);
+
+ if (_connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
+ return _noConnectionsRemain;
+ }
+
+ @Override
protected void onClose()
{
if (_transport != null)
@@ -262,6 +279,8 @@ public class AmqpPortImpl extends Abstra
{
_broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport),
getPort()));
}
+
+
_transport.close();
}
}
@@ -500,6 +519,11 @@ public class AmqpPortImpl extends Abstra
_connectionCountWarningGiven.compareAndSet(true,false);
}
+ if (_closing.get() && _connectionCount.get() == 0)
+ {
+ _noConnectionsRemain.set(null);
+ }
+
return openConnections;
}
@@ -511,7 +535,7 @@ public class AmqpPortImpl extends Abstra
@Override
public boolean canAcceptNewConnection(final SocketAddress
remoteSocketAddress)
{
- return _maxOpenConnections < 0 || _connectionCount.get() <
_maxOpenConnections;
+ return !_closing.get() && ( _maxOpenConnections < 0 ||
_connectionCount.get() < _maxOpenConnections );
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]