Repository: activemq Updated Branches: refs/heads/master 3a0a7238b -> bdec3f6dd
https://issues.apache.org/jira/browse/AMQ-6560 Converting flags in TransportConnection to AtomicBoolean to reduce synchronization and improve thread safety Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bdec3f6d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bdec3f6d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bdec3f6d Branch: refs/heads/master Commit: bdec3f6ddb7f1417690f3c89d07ea77f0d6d96e5 Parents: 3a0a723 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Thu Jan 12 07:36:16 2017 -0500 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Thu Jan 12 07:36:50 2017 -0500 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 38 ++++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/bdec3f6d/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index ac72534..afc27c3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -137,8 +137,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private boolean blocked; private boolean connected; private boolean active; - private boolean starting; - private boolean pendingStop; + private final AtomicBoolean starting = new AtomicBoolean(); + private final AtomicBoolean pendingStop = new AtomicBoolean(); private long timeStamp; private final AtomicBoolean stopping = new AtomicBoolean(false); private final CountDownLatch stopped = new CountDownLatch(1); @@ -224,7 +224,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } public void serviceTransportException(IOException e) { - if (!stopping.get() && !pendingStop) { + if (!stopping.get() && !pendingStop.get()) { transportException.set(e); if (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: " + e, e); @@ -303,7 +303,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } ConnectionError ce = new ConnectionError(); ce.setException(e); - if (pendingStop) { + if (pendingStop.get()) { dispatchSync(ce); } else { dispatchAsync(ce); @@ -324,7 +324,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { if (brokerService.isStopping()) { response = responseRequired ? new ExceptionResponse( new BrokerStoppedException("Broker " + brokerService + " is being stopped")) : null; - } else if (!pendingStop) { + } else if (!pendingStop.get()) { response = command.visit(this); } else { response = responseRequired ? new ExceptionResponse(transportException.get()) : null; @@ -993,7 +993,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { @Override public boolean iterate() { try { - if (pendingStop || stopping.get()) { + if (pendingStop.get() || stopping.get()) { if (dispatchStopped.compareAndSet(false, true)) { if (transportException.get() == null) { try { @@ -1051,7 +1051,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void start() throws Exception { try { synchronized (this) { - starting = true; + starting.set(true); if (taskRunnerFactory != null) { taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + getRemoteAddress()); @@ -1072,7 +1072,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } catch (Exception e) { // Force clean up on an error starting up. - pendingStop = true; + pendingStop.set(true); throw e; } finally { // stop() can be called from within the above block, @@ -1100,7 +1100,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void delayedStop(final int waitTime, final String reason, Throwable cause) { if (waitTime > 0) { synchronized (this) { - pendingStop = true; + pendingStop.set(true); transportException.set(cause); } try { @@ -1129,8 +1129,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void stopAsync() { // If we're in the middle of starting then go no further... for now. synchronized (this) { - pendingStop = true; - if (starting) { + pendingStop.set(true); + if (starting.get()) { LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); return; } @@ -1341,8 +1341,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { /** * @return true if the Connection is starting */ - public synchronized boolean isStarting() { - return starting; + public boolean isStarting() { + return starting.get(); } @Override @@ -1355,19 +1355,19 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return this.faultTolerantConnection; } - protected synchronized void setStarting(boolean starting) { - this.starting = starting; + protected void setStarting(boolean starting) { + this.starting.set(starting); } /** * @return true if the Connection needs to stop */ - public synchronized boolean isPendingStop() { - return pendingStop; + public boolean isPendingStop() { + return pendingStop.get(); } - protected synchronized void setPendingStop(boolean pendingStop) { - this.pendingStop = pendingStop; + protected void setPendingStop(boolean pendingStop) { + this.pendingStop.set(pendingStop); } private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {