Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 4cdd188ef -> 354265754


https://issues.apache.org/jira/browse/AMQ-6524

Fixing a thread safety issue with memoryUsage when using 
concurrentStoreAndDispatch
that was causing memory usage to get out of sync.

The InnerFutureTask class inside KahaDB was not thread safe which was
the root cause of the problem.

(cherry picked from commit 0f0bdb21ef97c39ec7d54d164e07921611c1de08)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/35426575
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/35426575
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/35426575

Branch: refs/heads/activemq-5.14.x
Commit: 3542657541e3581d77282654eb19ebae42014dbf
Parents: 4cdd188
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Fri Dec 2 12:19:53 2016 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Mon Dec 5 07:12:59 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/activemq/store/kahadb/KahaDBStore.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/35426575/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 5a723c9..1c7b3e7 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.BaseDestination;
@@ -1435,10 +1436,10 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
 
         private class InnerFutureTask extends FutureTask<Object> implements 
ListenableFuture<Object>  {
 
-            private Runnable listener;
+            private final AtomicReference<Runnable> listenerRef = new 
AtomicReference<>();
+
             public InnerFutureTask(Runnable runnable) {
                 super(runnable, null);
-
             }
 
             public void setException(final Exception e) {
@@ -1456,13 +1457,14 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
 
             @Override
             public void addListener(Runnable listener) {
-                this.listener = listener;
+                this.listenerRef.set(listener);
                 if (isDone()) {
                     fireListener();
                 }
             }
 
             private void fireListener() {
+                Runnable listener = listenerRef.getAndSet(null);
                 if (listener != null) {
                     try {
                         listener.run();

Reply via email to