Repository: activemq Updated Branches: refs/heads/activemq-5.10.x fc244f48e -> b41f92127
https://issues.apache.org/jira/browse/AMQ-5277 - jdbc store make use of entryLocator on ack Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7f797046 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7f797046 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7f797046 Branch: refs/heads/activemq-5.10.x Commit: 7f797046951b9cf2ef114582ac8e72ba88989110 Parents: fc244f4 Author: gtully <[email protected]> Authored: Mon Jul 14 16:58:47 2014 +0100 Committer: Hadrian Zbarcea <[email protected]> Committed: Tue Dec 16 21:50:54 2014 -0500 ---------------------------------------------------------------------- .../java/org/apache/activemq/broker/region/BaseDestination.java | 5 +++-- .../java/org/apache/activemq/store/jdbc/JDBCMessageStore.java | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7f797046/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index c3841c8..03513aa 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -782,10 +782,11 @@ public abstract class BaseDestination implements Destination { ack.copy(a); ack = a; // Convert to non-ranged. - ack.setFirstMessageId(node.getMessageId()); - ack.setLastMessageId(node.getMessageId()); ack.setMessageCount(1); } + // always use node messageId so we can access entry/data Location + ack.setFirstMessageId(node.getMessageId()); + ack.setLastMessageId(node.getMessageId()); return ack; } http://git-wip-us.apache.org/repos/asf/activemq/blob/7f797046/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 968b928..76ecced 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -207,7 +207,9 @@ public class JDBCMessageStore extends AbstractMessageStore { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0]; + long seq = ack.getLastMessageId().getEntryLocator() != null ? + (Long) ack.getLastMessageId().getEntryLocator() : + persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0]; // Get a connection and remove the message from the DB TransactionContext c = persistenceAdapter.getTransactionContext(context); @@ -309,6 +311,7 @@ public class JDBCMessageStore extends AbstractMessageStore { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); + msg.getMessageId().setEntryLocator(sequenceId); listener.recoverMessage(msg); lastRecoveredSequenceId.set(sequenceId); lastRecoveredPriority.set(msg.getPriority());
