AMQ-7067 - ensure updates to ackMessageFileMap are protected by the index lock
(cherry picked from commit a311139bfe2f2b3ffc0c84cfb1e9cec0c11830c7) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/29fbeb51 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/29fbeb51 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/29fbeb51 Branch: refs/heads/activemq-5.15.x Commit: 29fbeb511fd419bd4efb7bc87972e89d53d76c09 Parents: 7fa8518 Author: gtully <gary.tu...@gmail.com> Authored: Tue Oct 9 12:55:11 2018 +0100 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Wed Oct 10 10:23:29 2018 -0400 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 27 ++++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/29fbeb51/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 86dfcac..188b021 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1401,6 +1401,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void execute(Transaction tx) throws IOException { for (Operation op : messagingTx) { op.execute(tx); + recordAckMessageReferenceLocation(location, op.getLocation()); } } }); @@ -1408,21 +1409,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } finally { indexLock.writeLock().unlock(); } - for (Operation op: inflightTx) { - recordAckMessageReferenceLocation(location, op.getLocation()); - } } @SuppressWarnings("rawtypes") protected void process(KahaPrepareCommand command, Location location) { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); + List<Operation> tx = null; synchronized (inflightTransactions) { - List<Operation> tx = inflightTransactions.remove(key); + tx = inflightTransactions.remove(key); if (tx != null) { preparedTransactions.put(key, tx); - for (Operation op: tx) { + } + } + if (tx != null && !tx.isEmpty()) { + indexLock.writeLock().lock(); + try { + for (Operation op : tx) { recordAckMessageReferenceLocation(location, op.getLocation()); } + } finally { + indexLock.writeLock().unlock(); } } } @@ -1437,9 +1443,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe updates = preparedTransactions.remove(key); } } - if (key.isXATransaction() && updates != null) { - for(Operation op : updates) { - recordAckMessageReferenceLocation(location, op.getLocation()); + if (key.isXATransaction() && updates != null && !updates.isEmpty()) { + indexLock.writeLock().lock(); + try { + for (Operation op : updates) { + recordAckMessageReferenceLocation(location, op.getLocation()); + } + } finally { + indexLock.writeLock().unlock(); } } }