Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x e4da98bd7 -> 161ba22f6


https://issues.apache.org/jira/browse/AMQ-6560

Converting flags in TransportConnection to AtomicBoolean to reduce
synchronization and improve thread safety

(cherry picked from commit bdec3f6ddb7f1417690f3c89d07ea77f0d6d96e5)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/161ba22f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/161ba22f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/161ba22f

Branch: refs/heads/activemq-5.14.x
Commit: 161ba22f61a8d1d9d0380e75ec6d02353b88aca9
Parents: e4da98b
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Thu Jan 12 07:36:16 2017 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Thu Jan 12 07:41:30 2017 -0500

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    | 38 ++++++++++----------
 1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/161ba22f/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index a32d4f6..0507f2a 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -144,8 +144,8 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     private boolean blocked;
     private boolean connected;
     private boolean active;
-    private boolean starting;
-    private boolean pendingStop;
+    private final AtomicBoolean starting = new AtomicBoolean();
+    private final AtomicBoolean pendingStop = new AtomicBoolean();
     private long timeStamp;
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     private final CountDownLatch stopped = new CountDownLatch(1);
@@ -235,7 +235,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     }
 
     public void serviceTransportException(IOException e) {
-        if (!stopping.get() && !pendingStop) {
+        if (!stopping.get() && !pendingStop.get()) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug(this + " failed: " + e, e);
@@ -314,7 +314,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 }
                 ConnectionError ce = new ConnectionError();
                 ce.setException(e);
-                if (pendingStop) {
+                if (pendingStop.get()) {
                     dispatchSync(ce);
                 } else {
                     dispatchAsync(ce);
@@ -332,7 +332,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         boolean responseRequired = command.isResponseRequired();
         int commandId = command.getCommandId();
         try {
-            if (!pendingStop) {
+            if (!pendingStop.get()) {
                 response = command.visit(this);
             } else {
                 response = new ExceptionResponse(transportException.get());
@@ -1004,7 +1004,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     @Override
     public boolean iterate() {
         try {
-            if (pendingStop || stopping.get()) {
+            if (pendingStop.get() || stopping.get()) {
                 if (dispatchStopped.compareAndSet(false, true)) {
                     if (transportException.get() == null) {
                         try {
@@ -1062,7 +1062,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     public void start() throws Exception {
         try {
             synchronized (this) {
-                starting = true;
+                starting.set(true);
                 if (taskRunnerFactory != null) {
                     taskRunner = taskRunnerFactory.createTaskRunner(this, 
"ActiveMQ Connection Dispatcher: "
                             + getRemoteAddress());
@@ -1083,7 +1083,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
             }
         } catch (Exception e) {
             // Force clean up on an error starting up.
-            pendingStop = true;
+            pendingStop.set(true);
             throw e;
         } finally {
             // stop() can be called from within the above block,
@@ -1111,7 +1111,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     public void delayedStop(final int waitTime, final String reason, Throwable 
cause) {
         if (waitTime > 0) {
             synchronized (this) {
-                pendingStop = true;
+                pendingStop.set(true);
                 transportException.set(cause);
             }
             try {
@@ -1140,8 +1140,8 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     public void stopAsync() {
         // If we're in the middle of starting then go no further... for now.
         synchronized (this) {
-            pendingStop = true;
-            if (starting) {
+            pendingStop.set(true);
+            if (starting.get()) {
                 LOG.debug("stopAsync() called in the middle of start(). 
Delaying till start completes..");
                 return;
             }
@@ -1352,8 +1352,8 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     /**
      * @return true if the Connection is starting
      */
-    public synchronized boolean isStarting() {
-        return starting;
+    public boolean isStarting() {
+        return starting.get();
     }
 
     @Override
@@ -1366,19 +1366,19 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         return this.faultTolerantConnection;
     }
 
-    protected synchronized void setStarting(boolean starting) {
-        this.starting = starting;
+    protected void setStarting(boolean starting) {
+        this.starting.set(starting);
     }
 
     /**
      * @return true if the Connection needs to stop
      */
-    public synchronized boolean isPendingStop() {
-        return pendingStop;
+    public boolean isPendingStop() {
+        return pendingStop.get();
     }
 
-    protected synchronized void setPendingStop(boolean pendingStop) {
-        this.pendingStop = pendingStop;
+    protected void setPendingStop(boolean pendingStop) {
+        this.pendingStop.set(pendingStop);
     }
 
     private NetworkBridgeConfiguration getNetworkConfiguration(final 
BrokerInfo info) throws IOException {

Reply via email to