Repository: activemq Updated Branches: refs/heads/activemq-5.14.x e4da98bd7 -> 161ba22f6
https://issues.apache.org/jira/browse/AMQ-6560 Converting flags in TransportConnection to AtomicBoolean to reduce synchronization and improve thread safety (cherry picked from commit bdec3f6ddb7f1417690f3c89d07ea77f0d6d96e5) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/161ba22f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/161ba22f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/161ba22f Branch: refs/heads/activemq-5.14.x Commit: 161ba22f61a8d1d9d0380e75ec6d02353b88aca9 Parents: e4da98b Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Thu Jan 12 07:36:16 2017 -0500 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Thu Jan 12 07:41:30 2017 -0500 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 38 ++++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/161ba22f/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index a32d4f6..0507f2a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -144,8 +144,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private boolean blocked; private boolean connected; private boolean active; - private boolean starting; - private boolean pendingStop; + private final AtomicBoolean starting = new AtomicBoolean(); + private final AtomicBoolean pendingStop = new AtomicBoolean(); private long timeStamp; private final AtomicBoolean stopping = new AtomicBoolean(false); private final CountDownLatch stopped = new CountDownLatch(1); @@ -235,7 +235,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } public void serviceTransportException(IOException e) { - if (!stopping.get() && !pendingStop) { + if (!stopping.get() && !pendingStop.get()) { transportException.set(e); if (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: " + e, e); @@ -314,7 +314,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } ConnectionError ce = new ConnectionError(); ce.setException(e); - if (pendingStop) { + if (pendingStop.get()) { dispatchSync(ce); } else { dispatchAsync(ce); @@ -332,7 +332,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); try { - if (!pendingStop) { + if (!pendingStop.get()) { response = command.visit(this); } else { response = new ExceptionResponse(transportException.get()); @@ -1004,7 +1004,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { @Override public boolean iterate() { try { - if (pendingStop || stopping.get()) { + if (pendingStop.get() || stopping.get()) { if (dispatchStopped.compareAndSet(false, true)) { if (transportException.get() == null) { try { @@ -1062,7 +1062,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void start() throws Exception { try { synchronized (this) { - starting = true; + starting.set(true); if (taskRunnerFactory != null) { taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + getRemoteAddress()); @@ -1083,7 +1083,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } catch (Exception e) { // Force clean up on an error starting up. - pendingStop = true; + pendingStop.set(true); throw e; } finally { // stop() can be called from within the above block, @@ -1111,7 +1111,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void delayedStop(final int waitTime, final String reason, Throwable cause) { if (waitTime > 0) { synchronized (this) { - pendingStop = true; + pendingStop.set(true); transportException.set(cause); } try { @@ -1140,8 +1140,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void stopAsync() { // If we're in the middle of starting then go no further... for now. synchronized (this) { - pendingStop = true; - if (starting) { + pendingStop.set(true); + if (starting.get()) { LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); return; } @@ -1352,8 +1352,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { /** * @return true if the Connection is starting */ - public synchronized boolean isStarting() { - return starting; + public boolean isStarting() { + return starting.get(); } @Override @@ -1366,19 +1366,19 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return this.faultTolerantConnection; } - protected synchronized void setStarting(boolean starting) { - this.starting = starting; + protected void setStarting(boolean starting) { + this.starting.set(starting); } /** * @return true if the Connection needs to stop */ - public synchronized boolean isPendingStop() { - return pendingStop; + public boolean isPendingStop() { + return pendingStop.get(); } - protected synchronized void setPendingStop(boolean pendingStop) { - this.pendingStop = pendingStop; + protected void setPendingStop(boolean pendingStop) { + this.pendingStop.set(pendingStop); } private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {