Repository: activemq Updated Branches: refs/heads/master b78ef954d -> a2697b844
https://issues.apache.org/jira/browse/AMQ-5853 - track per priority sequence on load from the store. Allow db to select from entire prority 0-9 range. fix and additonal test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2697b84 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2697b84 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2697b84 Branch: refs/heads/master Commit: a2697b844e1ff61dee6f011ebd08eee59d23f1ca Parents: b78ef95 Author: gtully <gary.tu...@gmail.com> Authored: Mon Jul 6 15:32:23 2015 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Mon Jul 6 15:32:23 2015 +0100 ---------------------------------------------------------------------- .../apache/activemq/store/jdbc/JDBCAdapter.java | 2 +- .../activemq/store/jdbc/JDBCMessageStore.java | 42 +++++++-------- .../apache/activemq/store/jdbc/Statements.java | 14 ++++- .../store/jdbc/adapter/DefaultJDBCAdapter.java | 16 +++--- .../activemq/store/MessagePriorityTest.java | 54 ++++++++++++++++++++ 5 files changed, 95 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index df4fcf3..e611a01 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -87,7 +87,7 @@ public interface JDBCAdapter { int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException; - void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long nextSeq, long priority, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception; + void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries, long maxSeq, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception; long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 2270565..4674d7a 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -18,8 +18,8 @@ package org.apache.activemq.store.jdbc; import java.io.IOException; import java.sql.SQLException; +import java.util.Arrays; import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.ConnectionContext; @@ -66,11 +66,10 @@ public class JDBCMessageStore extends AbstractMessageStore { protected final WireFormat wireFormat; protected final JDBCAdapter adapter; protected final JDBCPersistenceAdapter persistenceAdapter; - protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1); - protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); protected ActiveMQMessageAudit audit; protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>(); - + final long[] perPriorityLastRecovered = new long[10]; + public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { super(destination); this.persistenceAdapter = persistenceAdapter; @@ -81,6 +80,7 @@ public class JDBCMessageStore extends AbstractMessageStore { if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) { recordDestinationCreation(destination); } + resetBatching(); } private void recordDestinationCreation(ActiveMQDestination destination) throws IOException { @@ -165,9 +165,6 @@ public class JDBCMessageStore extends AbstractMessageStore { if (xaXid == null) { onAdd(message, sequenceId, message.getPriority()); } - if (this.isPrioritizedMessages() && message.getPriority() > lastRecoveredPriority.get()) { - resetTrackedLastRecoveredPriority(); - } } // jdbc commit order is random with concurrent connections - limit scan to lowest pending @@ -334,9 +331,9 @@ public class JDBCMessageStore extends AbstractMessageStore { TransactionContext c = persistenceAdapter.getTransactionContext(); try { if (LOG.isTraceEnabled()) { - LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId()); + LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(perPriorityLastRecovered) + ", minPending:" + minPendingSequeunceId()); } - adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), + adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(), maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { @@ -344,8 +341,7 @@ public class JDBCMessageStore extends AbstractMessageStore { msg.getMessageId().setBrokerSequenceId(sequenceId); msg.getMessageId().setFutureOrSequenceLong(sequenceId); listener.recoverMessage(msg); - lastRecoveredSequenceId.set(sequenceId); - lastRecoveredPriority.set(msg.getPriority()); + trackLastRecovered(sequenceId, msg.getPriority()); return true; } @@ -366,35 +362,33 @@ public class JDBCMessageStore extends AbstractMessageStore { } + private void trackLastRecovered(long sequenceId, int priority) { + perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0] = sequenceId; + } + /** * @see org.apache.activemq.store.MessageStore#resetBatching() */ public void resetBatching() { if (LOG.isTraceEnabled()) { - LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get()); + LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered)); + } + for (int i=0;i<perPriorityLastRecovered.length;i++) { + perPriorityLastRecovered[i] = -1; } - lastRecoveredSequenceId.set(-1); - resetTrackedLastRecoveredPriority(); - } - private final void resetTrackedLastRecoveredPriority() { - lastRecoveredPriority.set(Byte.MAX_VALUE - 1); - } @Override public void setBatch(MessageId messageId) { try { long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination); - lastRecoveredSequenceId.set(storedValues[0]); - lastRecoveredPriority.set(storedValues[1]); + trackLastRecovered(storedValues[0], (int)storedValues[1]); } catch (IOException ignoredAsAlreadyLogged) { - lastRecoveredSequenceId.set(-1); - lastRecoveredPriority.set(Byte.MAX_VALUE -1); + resetBatching(); } if (LOG.isTraceEnabled()) { - LOG.trace(this + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get() - + ", priority: " + lastRecoveredPriority.get()); + LOG.trace(this + " setBatch: new last recovered: " + Arrays.toString(perPriorityLastRecovered)); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java index 1afe0e3..7bc6df5 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -491,7 +491,7 @@ public class Statements { public String getFindNextMessagesStatement() { if (findNextMessagesStatement == null) { findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() - + " WHERE CONTAINER=? AND ID > ? AND ID < ? AND XID IS NULL ORDER BY ID"; + + " WHERE CONTAINER=? AND ID < ? AND ID > ? AND XID IS NULL ORDER BY ID"; } return findNextMessagesStatement; } @@ -504,7 +504,17 @@ public class Statements { findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() + " WHERE CONTAINER=?" + " AND XID IS NULL" - + " AND ((ID > ? AND ID < ? AND PRIORITY = ?) OR PRIORITY < ?)" + + " AND ID < ? " + + " AND ( (ID > ? AND PRIORITY = 9) " + + " OR (ID > ? AND PRIORITY = 8) " + + " OR (ID > ? AND PRIORITY = 7) " + + " OR (ID > ? AND PRIORITY = 6) " + + " OR (ID > ? AND PRIORITY = 5) " + + " OR (ID > ? AND PRIORITY = 4) " + + " OR (ID > ? AND PRIORITY = 3) " + + " OR (ID > ? AND PRIORITY = 2) " + + " OR (ID > ? AND PRIORITY = 1) " + + " OR (ID > ? AND PRIORITY = 0) )" + " ORDER BY PRIORITY DESC, ID"; } return findNextMessagesByPriorityStatement; http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index c77f951..9c6f3bf 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -37,6 +37,7 @@ import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; +import org.apache.activemq.store.jdbc.JDBCMessageStore; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore; import org.apache.activemq.store.jdbc.Statements; @@ -1086,8 +1087,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { return result; } - public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long lastRecoveredSeq, - long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { + public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries, + long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); @@ -1099,11 +1100,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } s.setMaxRows(Math.min(maxReturned, maxRows)); s.setString(1, destination.getQualifiedName()); - s.setLong(2, lastRecoveredSeq); - s.setLong(3, maxSeq); + s.setLong(2, maxSeq); + int paramId = 3; if (isPrioritizedMessages) { - s.setLong(4, priority); - s.setLong(5, priority); + for (int i=9;i>=0;i--) { + s.setLong(paramId++, lastRecoveredEntries[i]); + } + } else { + s.setLong(paramId, lastRecoveredEntries[0]); } rs = s.executeQuery(); int count = 0; http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index adee523..e1d4b09 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -17,10 +17,12 @@ package org.apache.activemq.store; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -664,4 +666,56 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { } queueConsumer.close(); } + + public void initCombosForTestEveryXHi() { + // the cache limits the priority ordering to available memory + addCombinationValues("useCache", new Object[] {new Boolean(false)}); + // expiry processing can fill the cursor with a snapshot of the producer + // priority, before producers are complete + addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)}); + } + + public void testEveryXHi() throws Exception { + final int numMessages = 50; + ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST_LOW_THEN_HIGH_10"); + + final AtomicInteger received = new AtomicInteger(0); + MessageConsumer queueConsumer = sess.createConsumer(queue); + queueConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.incrementAndGet(); + } + }); + + MessageProducer producer = sess.createProducer(queue); + for (int i = 0; i < numMessages; i++) { + Message message = sess.createMessage(); + if (i % 5 == 0) { + message.setJMSPriority(9); + } else { + message.setJMSPriority(4); + } + producer.send(message, Message.DEFAULT_DELIVERY_MODE, message.getJMSPriority(), Message.DEFAULT_TIME_TO_LIVE); + } + + assertTrue("Got all", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return numMessages == received.get(); + } + })); + + + final DestinationStatistics destinationStatistics = ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics(); + assertTrue("Nothing else Like dlq involved", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount()); + return destinationStatistics.getEnqueues().getCount() == numMessages && destinationStatistics.getDequeues().getCount() == numMessages; + } + }, 10000)); + + queueConsumer.close(); + } }