Repository: activemq Updated Branches: refs/heads/activemq-5.13.x 0ba9f9340 -> c8a805def
https://issues.apache.org/jira/browse/AMQ-6287 Properly enclosing the indexLock in a try/finally inside of AckCompactionRunner Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c8a805de Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c8a805de Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c8a805de Branch: refs/heads/activemq-5.13.x Commit: c8a805deffc0315ccf3bcb5591fafe8b22e5bd60 Parents: 0ba9f93 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Wed May 11 12:35:30 2016 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Wed May 11 12:35:30 2016 +0000 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 43 +++++++++++--------- 1 file changed, 24 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/c8a805de/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 24a2636..96b5800 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1865,32 +1865,37 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @Override public void run() { + + int journalToAdvance = -1; + Set<Integer> journalLogsReferenced = new HashSet<Integer>(); + // Lock index to capture the ackMessageFileMap data indexLock.writeLock().lock(); - // Map keys might not be sorted, find the earliest log file to forward acks - // from and move only those, future cycles can chip away at more as needed. - // We won't move files that are themselves rewritten on a previous compaction. - List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); - Collections.sort(journalFileIds); - int journalToAdvance = -1; - for (Integer journalFileId : journalFileIds) { - DataFile current = journal.getDataFileById(journalFileId); - if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { - journalToAdvance = journalFileId; - break; + try { + // Map keys might not be sorted, find the earliest log file to forward acks + // from and move only those, future cycles can chip away at more as needed. + // We won't move files that are themselves rewritten on a previous compaction. + List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); + Collections.sort(journalFileIds); + for (Integer journalFileId : journalFileIds) { + DataFile current = journal.getDataFileById(journalFileId); + if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { + journalToAdvance = journalFileId; + break; + } } - } - // Check if we found one, or if we only found the current file being written to. - if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { - return; - } + // Check if we found one, or if we only found the current file being written to. + if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { + return; + } - Set<Integer> journalLogsReferenced = - new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance)); + journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); - indexLock.writeLock().unlock(); + } finally { + indexLock.writeLock().unlock(); + } try { // Background rewrite of the old acks