Author: gtully
Date: Mon Oct 13 08:24:31 2008
New Revision: 704142
URL: http://svn.apache.org/viewvc?rev=704142&view=rev
Log:
fix AMQ-1970 - pagedInMessages in slave was being filled due to more than
200(pageSize) unacked messages and slave not modifying the inflight count which
is used in the pageIn logic
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=704142&r1=704141&r2=704142&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Oct 13 08:24:31 2008
@@ -163,6 +163,7 @@
pending.remove();
createMessageDispatch(node, node.getMessage());
dispatched.add(node);
+ onDispatch(node, node.getMessage());
}
return;
}
@@ -173,7 +174,7 @@
}
throw new JMSException(
"Slave broker out of sync with master: Dispatched message ("
- + mdn.getMessageId() + ") was not in the pending
list");
+ + mdn.getMessageId() + ") was not in the pending list
for " + mdn.getDestination().getPhysicalName());
}
public final void acknowledge(final ConnectionContext context,final
MessageAck ack) throws Exception {
@@ -205,9 +206,7 @@
if (!this.getConsumerInfo().isBrowser()) {
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
}
- if (!isSlave()) {
-
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
- }
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
} else {
// setup a Synchronization to remove nodes from the
// dispatched list.
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=704142&r1=704141&r2=704142&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
Mon Oct 13 08:24:31 2008
@@ -16,14 +16,26 @@
*/
package org.apache.activemq.advisory;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
+
public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
String masterBindAddress = "tcp://localhost:61616";
String slaveBindAddress = "tcp://localhost:62616";
BrokerService slave;
-
+
/*
* add a slave broker
* @see org.apache.activemq.EmbeddedBrokerTestSupport#createBroker()
@@ -93,4 +105,46 @@
masterRb.getDestinationStatistics().getDispatched().getCount());
}
+ public void testMoreThanPageSizeUnacked() throws Exception {
+
+ final int messageCount = Queue.MAX_PAGE_SIZE + 10;
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ serverSession = serverConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQSession s = (ActiveMQSession) serverSession;
+ s.setSessionAsyncDispatch(true);
+
+ MessageConsumer serverConsumer =
serverSession.createConsumer(serverDestination);
+ serverConsumer.setMessageListener(new MessageListener() {
+
+ public void onMessage(Message msg) {
+ try {
+ latch.await(30L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ });
+
+ MessageProducer producer =
clientSession.createProducer(serverDestination);
+ for (int i =0; i< messageCount; i++) {
+ Message msg = clientSession.createMessage();
+ producer.send(msg);
+ }
+
+ RegionBroker slaveRb = (RegionBroker) slave.getBroker().getAdaptor(
+ RegionBroker.class);
+ RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
+ RegionBroker.class);
+
+ Thread.sleep(4000);
+ assertEquals("inflight match expected", messageCount,
masterRb.getDestinationStatistics().getInflight().getCount());
+ assertEquals("inflight match on slave and master",
slaveRb.getDestinationStatistics().getInflight().getCount(),
masterRb.getDestinationStatistics().getInflight().getCount());
+
+ latch.countDown();
+ Thread.sleep(4000);
+ assertEquals("inflight match expected", 0,
masterRb.getDestinationStatistics().getInflight().getCount());
+ assertEquals("inflight match on slave and master",
slaveRb.getDestinationStatistics().getInflight().getCount(),
masterRb.getDestinationStatistics().getInflight().getCount());
+ }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java?rev=704142&r1=704141&r2=704142&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
Mon Oct 13 08:24:31 2008
@@ -32,11 +32,11 @@
* @version $Revision: 397249 $
*/
public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
- private Connection serverConnection;
- private Session serverSession;
- private Connection clientConnection;
- private Session clientSession;
- private Destination serverDestination;
+ protected Connection serverConnection;
+ protected Session serverSession;
+ protected Connection clientConnection;
+ protected Session clientSession;
+ protected Destination serverDestination;
protected static final int COUNT = 2000;
public void testLoadRequestReply() throws Exception {