Hello,
I'm trying to delete a persistent message from a large queue with
removeMatchingMessages. The code works well until the memory that is used to
browse the queue hits the systemUsage.memoryUsage.limit.
I've changed the LargeQueueSparseDelete test case to reproduce this problem
(see below). In the Queue.java code I can see that doPageIn(true) will never
page in any more messages and the loop in removeMatchingMessages will never
end. It seems that the reference count for the messages in that loop is never
decreased.
Thanks,
Heiko
package org.apache.activemq.usecases;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.ConnectionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Assert;
/**
* This unit test creates a fixed size queue and tries to remove some messages
in the
* queueThe test is used to verify the performance of
* {@link
org.apache.activemq.broker.region.Queue#removeMatchingMessages(org.apache.activemq.broker.ConnectionContext,
String, int)}.
*/
public class LargeQueueLowMemoryRemoveTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG =
LoggerFactory.getLogger(LargeQueueLowMemoryRemoveTest.class);
/**
* {@inheritDoc}
*/
@Override
protected void setUp() throws Exception {
super.useTopic = false;
super.setUp();
}
/**
* @return whether or not persistence should be used
*/
protected boolean isPersistent() {
return true;
}
public void testRemoveMessages() throws Exception {
final int QUEUE_SIZE = 30000;
final int QUEUE_CHUNK_SIZE = 100;
final long TEST_TIMEOUT = 6000;
// Populate a test queue with uniquely-identifiable messages.
Connection conn = createConnection();
// Set the maximum memory to be used by the broker:
this.broker.getSystemUsage().getMemoryUsage().setLimit(10000000);
this.broker.getSystemUsage().getStoreUsage().setLimit( 200000000);
this.broker.getSystemUsage().getTempUsage().setLimit( 300000000);
int count = 0;
conn.start();
while (count < QUEUE_SIZE) {
try {
Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer =
session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < QUEUE_CHUNK_SIZE && count < QUEUE_SIZE;
i++) {
Message message = session.createMessage();
message.setIntProperty("id", count);
producer.send(message);
count += 1;
}
LOG.info("count = " + count);
session.close();
} finally {
}
}
conn.close();
// Access the implementation of the test queue and remove a message.
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
destination);
ConnectionContext context = new ConnectionContext(
new NonCachedMessageEvaluationContext());
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);
long startTimeMillis = System.currentTimeMillis();
Assert.assertEquals(1,
queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1)));
// Never reached this end, if the memory limit is lower than the queue
size.
long durationMillis = System.currentTimeMillis() - startTimeMillis;
LOG.info("It took " + durationMillis
+ "ms to remove the last message from a queue a " + QUEUE_SIZE
+ " messages.");
}
}