Repository: activemq Updated Branches: refs/heads/activemq-5.14.x 4cdd188ef -> 354265754
https://issues.apache.org/jira/browse/AMQ-6524 Fixing a thread safety issue with memoryUsage when using concurrentStoreAndDispatch that was causing memory usage to get out of sync. The InnerFutureTask class inside KahaDB was not thread safe which was the root cause of the problem. (cherry picked from commit 0f0bdb21ef97c39ec7d54d164e07921611c1de08) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/35426575 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/35426575 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/35426575 Branch: refs/heads/activemq-5.14.x Commit: 3542657541e3581d77282654eb19ebae42014dbf Parents: 4cdd188 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Fri Dec 2 12:19:53 2016 -0500 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Mon Dec 5 07:12:59 2016 -0500 ---------------------------------------------------------------------- .../java/org/apache/activemq/store/kahadb/KahaDBStore.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/35426575/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 5a723c9..1c7b3e7 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.BaseDestination; @@ -1435,10 +1436,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { - private Runnable listener; + private final AtomicReference<Runnable> listenerRef = new AtomicReference<>(); + public InnerFutureTask(Runnable runnable) { super(runnable, null); - } public void setException(final Exception e) { @@ -1456,13 +1457,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, @Override public void addListener(Runnable listener) { - this.listener = listener; + this.listenerRef.set(listener); if (isDone()) { fireListener(); } } private void fireListener() { + Runnable listener = listenerRef.getAndSet(null); if (listener != null) { try { listener.run();