Repository: activemq Updated Branches: refs/heads/master 078f39f58 -> 8b23e072e
https://issues.apache.org/jira/browse/AMQ-5785 Avoid holding the intrinsic lock on the cursor when expiring the messages. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b23e072 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b23e072 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b23e072 Branch: refs/heads/master Commit: 8b23e072eeab2beebf62fd267bf8d9f88d05b5c2 Parents: 078f39f Author: Timothy Bish <[email protected]> Authored: Tue Mar 8 13:37:58 2016 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Mar 8 13:37:58 2016 -0500 ---------------------------------------------------------------------- .../cursors/FilePendingMessageCursor.java | 40 +++++++++++++------- 1 file changed, 27 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8b23e072/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 9c9a8e7..3c2bd5f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -17,10 +17,13 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; @@ -31,25 +34,26 @@ import org.apache.activemq.command.Message; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.PList; -import org.apache.activemq.store.PListStore; import org.apache.activemq.store.PListEntry; +import org.apache.activemq.store.PListStore; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.activemq.util.ByteSequence; /** * persist pending messages pending message (messages awaiting dispatch to a * consumer) cursor - * - * */ public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { + static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); + private static final AtomicLong NAME_COUNT = new AtomicLong(); + protected Broker broker; private final PListStore store; private final String name; @@ -61,6 +65,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple private boolean flushRequired; private final AtomicBoolean started = new AtomicBoolean(); private final WireFormat wireFormat = new OpenWireFormat(); + /** * @param broker * @param name @@ -374,9 +379,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple @Override public synchronized boolean isFull() { - return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull()); - } @Override @@ -392,11 +395,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple @Override public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { if (newPercentUsage >= getMemoryUsageHighWaterMark()) { + List<MessageReference> expiredMessages = null; synchronized (this) { if (!flushRequired && size() != 0) { flushRequired =true; if (!iterating) { - expireOldMessages(); + expiredMessages = expireOldMessages(); if (!hasSpace()) { flushToDisk(); flushRequired = false; @@ -404,6 +408,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple } } } + + if (expiredMessages != null) { + for (MessageReference node : expiredMessages) { + discardExpiredMessage(node); + } + } } } @@ -412,26 +422,30 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple return true; } - protected synchronized void expireOldMessages() { + private synchronized List<MessageReference> expireOldMessages() { + List<MessageReference> expired = new ArrayList<MessageReference>(); if (!memoryList.isEmpty()) { for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); if (node.isExpired()) { node.decrementReferenceCount(); - discardExpiredMessage(node); + expired.add(node); iterator.remove(); } } } + + return expired; } protected synchronized void flushToDisk() { if (!memoryList.isEmpty() && store != null) { long start = 0; - if (LOG.isTraceEnabled()) { + if (LOG.isTraceEnabled()) { start = System.currentTimeMillis(); - LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); - } + LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(), + (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); + } for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); node.decrementReferenceCount();
