Repository: activemq
Updated Branches:
  refs/heads/master 3a0a7238b -> bdec3f6dd


https://issues.apache.org/jira/browse/AMQ-6560

Converting flags in TransportConnection to AtomicBoolean to reduce
synchronization and improve thread safety


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bdec3f6d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bdec3f6d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bdec3f6d

Branch: refs/heads/master
Commit: bdec3f6ddb7f1417690f3c89d07ea77f0d6d96e5
Parents: 3a0a723
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Thu Jan 12 07:36:16 2017 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Thu Jan 12 07:36:50 2017 -0500

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    | 38 ++++++++++----------
 1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bdec3f6d/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index ac72534..afc27c3 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -137,8 +137,8 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     private boolean blocked;
     private boolean connected;
     private boolean active;
-    private boolean starting;
-    private boolean pendingStop;
+    private final AtomicBoolean starting = new AtomicBoolean();
+    private final AtomicBoolean pendingStop = new AtomicBoolean();
     private long timeStamp;
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     private final CountDownLatch stopped = new CountDownLatch(1);
@@ -224,7 +224,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     }
 
     public void serviceTransportException(IOException e) {
-        if (!stopping.get() && !pendingStop) {
+        if (!stopping.get() && !pendingStop.get()) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug(this + " failed: " + e, e);
@@ -303,7 +303,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 }
                 ConnectionError ce = new ConnectionError();
                 ce.setException(e);
-                if (pendingStop) {
+                if (pendingStop.get()) {
                     dispatchSync(ce);
                 } else {
                     dispatchAsync(ce);
@@ -324,7 +324,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
             if (brokerService.isStopping()) {
                 response = responseRequired ? new ExceptionResponse(
                     new BrokerStoppedException("Broker " + brokerService + " 
is being stopped")) : null;
-            } else if (!pendingStop) {
+            } else if (!pendingStop.get()) {
                 response = command.visit(this);
             } else {
                 response = responseRequired ? new 
ExceptionResponse(transportException.get()) : null;
@@ -993,7 +993,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     @Override
     public boolean iterate() {
         try {
-            if (pendingStop || stopping.get()) {
+            if (pendingStop.get() || stopping.get()) {
                 if (dispatchStopped.compareAndSet(false, true)) {
                     if (transportException.get() == null) {
                         try {
@@ -1051,7 +1051,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     public void start() throws Exception {
         try {
             synchronized (this) {
-                starting = true;
+                starting.set(true);
                 if (taskRunnerFactory != null) {
                     taskRunner = taskRunnerFactory.createTaskRunner(this, 
"ActiveMQ Connection Dispatcher: "
                             + getRemoteAddress());
@@ -1072,7 +1072,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
             }
         } catch (Exception e) {
             // Force clean up on an error starting up.
-            pendingStop = true;
+            pendingStop.set(true);
             throw e;
         } finally {
             // stop() can be called from within the above block,
@@ -1100,7 +1100,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     public void delayedStop(final int waitTime, final String reason, Throwable 
cause) {
         if (waitTime > 0) {
             synchronized (this) {
-                pendingStop = true;
+                pendingStop.set(true);
                 transportException.set(cause);
             }
             try {
@@ -1129,8 +1129,8 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     public void stopAsync() {
         // If we're in the middle of starting then go no further... for now.
         synchronized (this) {
-            pendingStop = true;
-            if (starting) {
+            pendingStop.set(true);
+            if (starting.get()) {
                 LOG.debug("stopAsync() called in the middle of start(). 
Delaying till start completes..");
                 return;
             }
@@ -1341,8 +1341,8 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     /**
      * @return true if the Connection is starting
      */
-    public synchronized boolean isStarting() {
-        return starting;
+    public boolean isStarting() {
+        return starting.get();
     }
 
     @Override
@@ -1355,19 +1355,19 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         return this.faultTolerantConnection;
     }
 
-    protected synchronized void setStarting(boolean starting) {
-        this.starting = starting;
+    protected void setStarting(boolean starting) {
+        this.starting.set(starting);
     }
 
     /**
      * @return true if the Connection needs to stop
      */
-    public synchronized boolean isPendingStop() {
-        return pendingStop;
+    public boolean isPendingStop() {
+        return pendingStop.get();
     }
 
-    protected synchronized void setPendingStop(boolean pendingStop) {
-        this.pendingStop = pendingStop;
+    protected void setPendingStop(boolean pendingStop) {
+        this.pendingStop.set(pendingStop);
     }
 
     private NetworkBridgeConfiguration getNetworkConfiguration(final 
BrokerInfo info) throws IOException {

Reply via email to