Author: tabish
Date: Thu May 31 18:14:07 2012
New Revision: 1344842
URL: http://svn.apache.org/viewvc?rev=1344842&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3871
Fix the addMessageFirst and values methods in OrderedPendingList and add tests
for both OrderedPendingList and PrioritizedPendingList along with a test case
to show the bad behavior for re-delivered in-flight non-persistent messages
with durable subscribers.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1344842&r1=1344841&r2=1344842&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Thu May 31 18:14:07 2012
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,7 +62,6 @@ public class DurableTopicSubscription ex
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(),
info.getSubscriptionName());
-
}
public final boolean isActive() {
@@ -180,6 +180,10 @@ public class DurableTopicSubscription ex
}
}
+ // Before we add these back to pending they need to be in
producer order not
+ // dispatch order so we can add them to the front of the
pending list.
+ Collections.reverse(dispatched);
+
for (final MessageReference node : dispatched) {
// Mark the dispatched messages as redelivered for next
time.
Integer count =
redeliveredMessages.get(node.getMessageId());
@@ -195,6 +199,7 @@ public class DurableTopicSubscription ex
node.decrementReferenceCount();
}
}
+
dispatched.clear();
}
if (!keepDurableSubsActive && pending.isTransient()) {
@@ -213,7 +218,6 @@ public class DurableTopicSubscription ex
prefetchExtension.set(0);
}
-
protected MessageDispatch createMessageDispatch(MessageReference node,
Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
if (node != QueueMessageReference.NULL_MESSAGE) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java?rev=1344842&r1=1344841&r2=1344842&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
Thu May 31 18:14:07 2012
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
@@ -38,6 +39,7 @@ public class OrderedPendingList implemen
tail = node;
} else {
root.linkBefore(node);
+ root = node;
}
this.map.put(message.getMessageId(), node);
return node;
@@ -134,11 +136,14 @@ public class OrderedPendingList implemen
@Override
public boolean contains(MessageReference message) {
- if(map.values().contains(message)) {
- return true;
- } else {
- return false;
+ if (message != null) {
+ for (PendingNode value : map.values()) {
+ if (value.getMessage().equals(message)) {
+ return true;
+ }
+ }
}
+ return false;
}
@Override
@@ -152,8 +157,10 @@ public class OrderedPendingList implemen
@Override
public void addAll(PendingList pendingList) {
- for(MessageReference messageReference : pendingList) {
- addMessageLast(messageReference);
+ if (pendingList != null) {
+ for(MessageReference messageReference : pendingList) {
+ addMessageLast(messageReference);
+ }
}
}
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java?rev=1344842&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
Thu May 31 18:14:07 2012
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.Test;
+
+public class OrderPendingListTest {
+
+ @Test
+ public void testAddMessageFirst() throws Exception {
+
+ OrderedPendingList list = new OrderedPendingList();
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+
+ Iterator<MessageReference> iter = list.iterator();
+ int lastId = list.size();
+ while (iter.hasNext()) {
+ assertEquals(lastId--,
iter.next().getMessageId().getProducerSequenceId());
+ }
+ }
+
+ @Test
+ public void testAddMessageLast() throws Exception {
+
+ OrderedPendingList list = new OrderedPendingList();
+
+ list.addMessageLast(new TestMessageReference(1));
+ list.addMessageLast(new TestMessageReference(2));
+ list.addMessageLast(new TestMessageReference(3));
+ list.addMessageLast(new TestMessageReference(4));
+ list.addMessageLast(new TestMessageReference(5));
+
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+
+ Iterator<MessageReference> iter = list.iterator();
+ int lastId = 1;
+ while (iter.hasNext()) {
+ assertEquals(lastId++,
iter.next().getMessageId().getProducerSequenceId());
+ }
+ }
+
+ @Test
+ public void testClear() throws Exception {
+ OrderedPendingList list = new OrderedPendingList();
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertFalse(list.isEmpty());
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+
+ list.clear();
+
+ assertTrue(list.isEmpty());
+ assertTrue(list.size() == 0);
+ assertEquals(0, list.getAsList().size());
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageLast(new TestMessageReference(2));
+ list.addMessageLast(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageLast(new TestMessageReference(5));
+
+ assertFalse(list.isEmpty());
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+ }
+
+ @Test
+ public void testIsEmpty() throws Exception {
+ OrderedPendingList list = new OrderedPendingList();
+ assertTrue(list.isEmpty());
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertFalse(list.isEmpty());
+ list.clear();
+ assertTrue(list.isEmpty());
+ }
+
+ @Test
+ public void testSize() {
+ OrderedPendingList list = new OrderedPendingList();
+ assertTrue(list.isEmpty());
+
+ assertTrue(list.size() == 0);
+ list.addMessageFirst(new TestMessageReference(1));
+ assertTrue(list.size() == 1);
+ list.addMessageLast(new TestMessageReference(2));
+ assertTrue(list.size() == 2);
+ list.addMessageFirst(new TestMessageReference(3));
+ assertTrue(list.size() == 3);
+ list.addMessageLast(new TestMessageReference(4));
+ assertTrue(list.size() == 4);
+ list.addMessageFirst(new TestMessageReference(5));
+ assertTrue(list.size() == 5);
+
+ assertFalse(list.isEmpty());
+ list.clear();
+ assertTrue(list.isEmpty());
+ assertTrue(list.size() == 0);
+ }
+
+ @Test
+ public void testRemove() throws Exception {
+
+ OrderedPendingList list = new OrderedPendingList();
+
+ TestMessageReference toRemove = new TestMessageReference(6);
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+
+ list.addMessageLast(toRemove);
+ list.remove(toRemove);
+
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+
+ list.remove(toRemove);
+
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+
+ Iterator<MessageReference> iter = list.iterator();
+ int lastId = list.size();
+ while (iter.hasNext()) {
+ assertEquals(lastId--,
iter.next().getMessageId().getProducerSequenceId());
+ }
+
+ list.remove(null);
+ }
+
+ @Test
+ public void testContains() throws Exception {
+
+ OrderedPendingList list = new OrderedPendingList();
+
+ TestMessageReference toRemove = new TestMessageReference(6);
+
+ assertFalse(list.contains(toRemove));
+ assertFalse(list.contains(null));
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+
+ list.addMessageLast(toRemove);
+ assertTrue(list.size() == 6);
+ assertTrue(list.contains(toRemove));
+ list.remove(toRemove);
+ assertFalse(list.contains(toRemove));
+
+ assertTrue(list.size() == 5);
+ assertEquals(5, list.getAsList().size());
+ }
+
+ @Test
+ public void testValues() throws Exception {
+
+ OrderedPendingList list = new OrderedPendingList();
+
+ TestMessageReference toRemove = new TestMessageReference(6);
+
+ assertFalse(list.contains(toRemove));
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ Collection<MessageReference> values = list.values();
+ assertEquals(5, values.size());
+
+ for (MessageReference msg : values) {
+ assertTrue(values.contains(msg));
+ }
+
+ assertFalse(values.contains(toRemove));
+
+ list.addMessageLast(toRemove);
+ values = list.values();
+ assertEquals(6, values.size());
+ for (MessageReference msg : values) {
+ assertTrue(values.contains(msg));
+ }
+
+ assertTrue(values.contains(toRemove));
+ }
+
+ @Test
+ public void testAddAll() throws Exception {
+ OrderedPendingList list = new OrderedPendingList();
+ TestPendingList source = new TestPendingList();
+
+ source.addMessageFirst(new TestMessageReference(1));
+ source.addMessageFirst(new TestMessageReference(2));
+ source.addMessageFirst(new TestMessageReference(3));
+ source.addMessageFirst(new TestMessageReference(4));
+ source.addMessageFirst(new TestMessageReference(5));
+
+ assertTrue(list.isEmpty());
+ assertEquals(5, source.size());
+ list.addAll(source);
+ assertEquals(5, list.size());
+
+ for (MessageReference message : source) {
+ assertTrue(list.contains(message));
+ }
+
+ list.addAll(null);
+ }
+
+ static class TestPendingList implements PendingList {
+
+ private final LinkedList<MessageReference> theList = new
LinkedList<MessageReference>();
+
+ @Override
+ public boolean isEmpty() {
+ return theList.isEmpty();
+ }
+
+ @Override
+ public void clear() {
+ theList.clear();
+ }
+
+ @Override
+ public PendingNode addMessageFirst(MessageReference message) {
+ theList.addFirst(message);
+ return new PendingNode(null, message);
+ }
+
+ @Override
+ public PendingNode addMessageLast(MessageReference message) {
+ theList.addLast(message);
+ return new PendingNode(null, message);
+ }
+
+ @Override
+ public PendingNode remove(MessageReference message) {
+ if (theList.remove(message)) {
+ return new PendingNode(null, message);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int size() {
+ return theList.size();
+ }
+
+ @Override
+ public Iterator<MessageReference> iterator() {
+ return theList.iterator();
+ }
+
+ @Override
+ public boolean contains(MessageReference message) {
+ return theList.contains(message);
+ }
+
+ @Override
+ public Collection<MessageReference> values() {
+ return theList;
+ }
+
+ @Override
+ public void addAll(PendingList pendingList) {
+ for(MessageReference messageReference : pendingList) {
+ theList.add(messageReference);
+ }
+ }
+ }
+
+ static class TestMessageReference implements MessageReference {
+
+ private static final IdGenerator id = new IdGenerator();
+
+ private MessageId messageId;
+ private int referenceCount = 0;
+
+ public TestMessageReference(int sequenceId) {
+ messageId = new MessageId(id.generateId() + ":1", sequenceId);
+ }
+
+ @Override
+ public MessageId getMessageId() {
+ return messageId;
+ }
+
+ @Override
+ public Message getMessageHardRef() {
+ return null;
+ }
+
+ @Override
+ public Message getMessage() {
+ return null;
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return false;
+ }
+
+ @Override
+ public Destination getRegionDestination() {
+ return null;
+ }
+
+ @Override
+ public int getRedeliveryCounter() {
+ return 0;
+ }
+
+ @Override
+ public void incrementRedeliveryCounter() {
+ }
+
+ @Override
+ public int getReferenceCount() {
+ return this.referenceCount;
+ }
+
+ @Override
+ public int incrementReferenceCount() {
+ return this.referenceCount++;
+ }
+
+ @Override
+ public int decrementReferenceCount() {
+ return this.referenceCount--;
+ }
+
+ @Override
+ public ConsumerId getTargetConsumerId() {
+ return null;
+ }
+
+ @Override
+ public int getSize() {
+ return 1;
+ }
+
+ @Override
+ public long getExpiration() {
+ return 0;
+ }
+
+ @Override
+ public String getGroupID() {
+ return null;
+ }
+
+ @Override
+ public int getGroupSequence() {
+ return 0;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return false;
+ }
+
+ @Override
+ public boolean isDropped() {
+ return false;
+ }
+
+ @Override
+ public boolean isAdvisory() {
+ return false;
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java?rev=1344842&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
Thu May 31 18:14:07 2012
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.Test;
+
+public class PrioritizedPendingListTest {
+
+ @Test
+ public void testAddMessageFirst() {
+ PrioritizedPendingList list = new PrioritizedPendingList();
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertTrue(list.size() == 5);
+
+ Iterator<MessageReference> iter = list.iterator();
+ int lastId = list.size();
+ while (iter.hasNext()) {
+ assertEquals(lastId--,
iter.next().getMessageId().getProducerSequenceId());
+ }
+ }
+
+ @Test
+ public void testAddMessageLast() {
+
+ PrioritizedPendingList list = new PrioritizedPendingList();
+
+ list.addMessageLast(new TestMessageReference(1));
+ list.addMessageLast(new TestMessageReference(2));
+ list.addMessageLast(new TestMessageReference(3));
+ list.addMessageLast(new TestMessageReference(4));
+ list.addMessageLast(new TestMessageReference(5));
+
+ assertTrue(list.size() == 5);
+
+ Iterator<MessageReference> iter = list.iterator();
+ int lastId = 1;
+ while (iter.hasNext()) {
+ assertEquals(lastId++,
iter.next().getMessageId().getProducerSequenceId());
+ }
+ }
+
+ @Test
+ public void testClear() {
+ PrioritizedPendingList list = new PrioritizedPendingList();
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertFalse(list.isEmpty());
+ assertTrue(list.size() == 5);
+
+ list.clear();
+
+ assertTrue(list.isEmpty());
+ assertTrue(list.size() == 0);
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageLast(new TestMessageReference(2));
+ list.addMessageLast(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageLast(new TestMessageReference(5));
+
+ assertFalse(list.isEmpty());
+ assertTrue(list.size() == 5);
+ }
+
+ @Test
+ public void testIsEmpty() {
+ PrioritizedPendingList list = new PrioritizedPendingList();
+ assertTrue(list.isEmpty());
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertFalse(list.isEmpty());
+ list.clear();
+ assertTrue(list.isEmpty());
+ }
+
+ @Test
+ public void testRemove() {
+ PrioritizedPendingList list = new PrioritizedPendingList();
+
+ TestMessageReference toRemove = new TestMessageReference(6);
+
+ list.addMessageFirst(new TestMessageReference(1));
+ list.addMessageFirst(new TestMessageReference(2));
+ list.addMessageFirst(new TestMessageReference(3));
+ list.addMessageFirst(new TestMessageReference(4));
+ list.addMessageFirst(new TestMessageReference(5));
+
+ assertTrue(list.size() == 5);
+
+ list.addMessageLast(toRemove);
+ list.remove(toRemove);
+
+ assertTrue(list.size() == 5);
+
+ list.remove(toRemove);
+
+ assertTrue(list.size() == 5);
+
+ Iterator<MessageReference> iter = list.iterator();
+ int lastId = list.size();
+ while (iter.hasNext()) {
+ assertEquals(lastId--,
iter.next().getMessageId().getProducerSequenceId());
+ }
+
+ list.remove(null);
+ }
+
+ @Test
+ public void testSize() {
+ PrioritizedPendingList list = new PrioritizedPendingList();
+ assertTrue(list.isEmpty());
+
+ assertTrue(list.size() == 0);
+ list.addMessageFirst(new TestMessageReference(1));
+ assertTrue(list.size() == 1);
+ list.addMessageLast(new TestMessageReference(2));
+ assertTrue(list.size() == 2);
+ list.addMessageFirst(new TestMessageReference(3));
+ assertTrue(list.size() == 3);
+ list.addMessageLast(new TestMessageReference(4));
+ assertTrue(list.size() == 4);
+ list.addMessageFirst(new TestMessageReference(5));
+ assertTrue(list.size() == 5);
+
+ assertFalse(list.isEmpty());
+ list.clear();
+ assertTrue(list.isEmpty());
+ assertTrue(list.size() == 0);
+ }
+
+ @Test
+ public void testPrioritization() {
+ PrioritizedPendingList list = new PrioritizedPendingList();
+
+ list.addMessageFirst(new TestMessageReference(1, 5));
+ list.addMessageFirst(new TestMessageReference(2, 4));
+ list.addMessageFirst(new TestMessageReference(3, 3));
+ list.addMessageFirst(new TestMessageReference(4, 2));
+ list.addMessageFirst(new TestMessageReference(5, 1));
+
+ assertTrue(list.size() == 5);
+
+ Iterator<MessageReference> iter = list.iterator();
+ int lastId = list.size();
+ while (iter.hasNext()) {
+ assertEquals(lastId--, iter.next().getMessage().getPriority());
+ }
+ }
+
+ static class TestMessageReference implements MessageReference {
+
+ private static final IdGenerator id = new IdGenerator();
+
+ private Message message;
+ private MessageId messageId;
+ private int referenceCount = 0;
+
+ public TestMessageReference(int sequenceId) {
+ messageId = new MessageId(id.generateId() + ":1", sequenceId);
+ message = new ActiveMQMessage();
+ message.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
+ }
+
+ public TestMessageReference(int sequenceId, int priority) {
+ messageId = new MessageId(id.generateId() + ":1", sequenceId);
+ message = new ActiveMQMessage();
+ message.setPriority((byte) priority);
+ }
+
+ @Override
+ public MessageId getMessageId() {
+ return messageId;
+ }
+
+ @Override
+ public Message getMessageHardRef() {
+ return null;
+ }
+
+ @Override
+ public Message getMessage() {
+ return message;
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return false;
+ }
+
+ @Override
+ public Destination getRegionDestination() {
+ return null;
+ }
+
+ @Override
+ public int getRedeliveryCounter() {
+ return 0;
+ }
+
+ @Override
+ public void incrementRedeliveryCounter() {
+ }
+
+ @Override
+ public int getReferenceCount() {
+ return this.referenceCount;
+ }
+
+ @Override
+ public int incrementReferenceCount() {
+ return this.referenceCount++;
+ }
+
+ @Override
+ public int decrementReferenceCount() {
+ return this.referenceCount--;
+ }
+
+ @Override
+ public ConsumerId getTargetConsumerId() {
+ return null;
+ }
+
+ @Override
+ public int getSize() {
+ return 1;
+ }
+
+ @Override
+ public long getExpiration() {
+ return 0;
+ }
+
+ @Override
+ public String getGroupID() {
+ return null;
+ }
+
+ @Override
+ public int getGroupSequence() {
+ return 0;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return false;
+ }
+
+ @Override
+ public boolean isDropped() {
+ return false;
+ }
+
+ @Override
+ public boolean isAdvisory() {
+ return false;
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java?rev=1344842&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
Thu May 31 18:14:07 2012
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DurableSubscriberNonPersistentMessageTest extends TestCase {
+
+ private final Logger LOG =
LoggerFactory.getLogger(DurableSubscriberNonPersistentMessageTest.class);
+ private String brokerURL = "failover:(tcp://localhost:61616)";
+ private String consumerBrokerURL = brokerURL +
"?jms.prefetchPolicy.all=100";
+
+ int initialMaxMsgs = 10;
+ int cleanupMsgCount = 10;
+ int totalMsgCount = initialMaxMsgs + cleanupMsgCount;
+ int totalMsgReceived = 0;
+ int sleep = 500;
+ int reconnectSleep = 2000;
+ int messageTimeout = 1000;
+ int messageSize = 1024;
+
+ // Note: If ttl is set 0, the default set by the broker will be used if any
+ // setting a value greater than 0 will enable the producer to set the ttl
on
+ // the message
+ long ttl = 0;
+
+ static String clientId = "Jason";
+ MBeanServer mbeanServer;
+
+ BrokerService broker;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ broker = new BrokerService();
+ broker.addConnector("tcp://localhost:61616");
+ KahaDBStore store = new KahaDBStore();
+ store.setDirectory(new File("data"));
+ broker.setPersistenceAdapter(store);
+ broker.start();
+
+ mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ broker.stop();
+ super.tearDown();
+ }
+
+ /**
+ * Create the test case
+ *
+ * @param testName
+ * name of the test case
+ */
+ public DurableSubscriberNonPersistentMessageTest(String testName) {
+ super(testName);
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite() {
+ return new TestSuite(DurableSubscriberNonPersistentMessageTest.class);
+ }
+
+ public void testDurableSubscriberNonPersistentMessage() {
+ String interest = "TEST";
+
+ LOG.info("Starting DurableSubscriberNonPersistentMessageTest");
+
+ try {
+ // create durable topic consumer and disconnect
+ createConsumer(interest, 0);
+ Thread.sleep(1000);
+
+ // produce 15 messages to topic
+ Producer producer = new Producer(brokerURL, interest, messageSize,
ttl);
+ producer.sendMessages(totalMsgCount);
+ producer.close();
+ LOG.info(totalMsgCount + " messages sent");
+
+ // durable topic consumer will consume 10 messages and disconnect
+ createConsumer(interest, initialMaxMsgs);
+
+ Thread.sleep(reconnectSleep);
+
+ createConsumer(interest, cleanupMsgCount);
+
+ String brokerVersion = (String) mbeanServer.getAttribute(new
ObjectName("org.apache.activemq:BrokerName=localhost,Type=Broker"),
"BrokerVersion");
+
+ LOG.info("Test run on: " + brokerVersion);
+ // Fuse and Apache 5.6 use different object strings if the consumer
+ // is offline, maybe this has something to do with the difference
in
+ // behavior?
+ String jmxObject =
"org.apache.activemq:BrokerName=localhost,Type=Subscription,active=false,name=Jason_MyDurableTopic";
+ if (brokerVersion == null || brokerVersion.contains("fuse") ||
brokerVersion.contains("5.6")) {
+ jmxObject =
"org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Durable,subscriptionID=MyDurableTopic,destinationType=Topic,destinationName=TEST,clientId=Jason";
+ }
+
+ final String theJmxObject = jmxObject;
+
+ assertTrue("pendingQueueSize should be zero", Wait.waitFor(new
Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ Integer pendingQueueSize = (Integer)
mbeanServer.getAttribute(new ObjectName(theJmxObject), "PendingQueueSize");
+ LOG.info("pendingQueueSize = " + pendingQueueSize);
+ return pendingQueueSize.intValue() == 0;
+ }
+ }));
+
+ assertTrue("cursorMemoryUsage should be zero", Wait.waitFor(new
Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ Long cursorMemoryUsage = (Long)
mbeanServer.getAttribute(new ObjectName(theJmxObject), "CursorMemoryUsage");
+ LOG.info("cursorMemoryUsage = " + cursorMemoryUsage);
+ return cursorMemoryUsage.longValue() == 0L;
+ }
+ }));
+
+ // Not sure what the behavior should be here, if the messages
+ // expired the received count shouldn't equal total message count
+ assertTrue(totalMsgReceived == initialMaxMsgs + cleanupMsgCount);
+ } catch (Exception e) {
+ LOG.error("Exception Executing
DurableSubscriberNonPersistentMessageTest: " + getStackTrace(e));
+ fail("Should not throw any exceptions");
+ }
+ }
+
+ // create durable topic consumer and max number of messages
+ public void createConsumer(String interest, int maxMsgs) {
+ int messageReceived = 0;
+ int messagesNotReceived = 0;
+
+ LOG.info("Starting DurableSubscriber");
+
+ Consumer consumer = null;
+
+ try {
+ consumer = new Consumer(consumerBrokerURL, interest, clientId);
+
+ for (int i = 0; i < maxMsgs; i++) {
+ try {
+ Message msg = consumer.getMessage(messageTimeout);
+ if (msg != null) {
+ LOG.debug("Received Message: " + msg.toString());
+ messageReceived++;
+ totalMsgReceived++;
+ } else {
+ LOG.debug("message " + i + " not received");
+ messagesNotReceived++;
+ }
+
+ Thread.sleep(sleep);
+ } catch (InterruptedException ie) {
+ LOG.debug("Exception: " + ie);
+ }
+ }
+
+ consumer.close();
+
+ LOG.info("Consumer Finished");
+ LOG.info("Received " + messageReceived);
+ LOG.info("Not Received " + messagesNotReceived);
+ } catch (JMSException e) {
+ LOG.error("Exception Executing SimpleConsumer: " +
getStackTrace(e));
+ }
+ }
+
+ public String getStackTrace(Throwable aThrowable) {
+ final Writer result = new StringWriter();
+ final PrintWriter printWriter = new PrintWriter(result);
+ aThrowable.printStackTrace(printWriter);
+ return result.toString();
+ }
+
+ public class Producer {
+
+ protected ConnectionFactory factory;
+ protected transient Connection connection;
+ protected transient Session session;
+ protected transient MessageProducer producer;
+ protected static final int messageSize = 1024;
+
+ public Producer(String brokerURL, String interest, int messageSize,
long ttl) throws JMSException {
+
+ factory = new ActiveMQConnectionFactory(brokerURL);
+ connection = factory.createConnection();
+ connection.start();
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(session.createTopic(interest));
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ if (ttl > 0) {
+ producer.setTimeToLive(ttl);
+ }
+ }
+
+ public void close() throws JMSException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ protected void sendMessage() throws JMSException {
+ TextMessage textMessage = session.createTextMessage("test
message");
+ producer.send(textMessage);
+ }
+
+ protected void sendMessages(int count) throws JMSException {
+ for (int i = 0; i < count; i++) {
+ TextMessage textMessage =
session.createTextMessage(createMessageText(i));
+ producer.send(textMessage);
+ }
+ }
+
+ private String createMessageText(int index) {
+ StringBuffer buffer = new StringBuffer(messageSize);
+ buffer.append("Message: " + index + " sent at: " + new Date());
+ if (buffer.length() > messageSize) {
+ return buffer.substring(0, messageSize);
+ }
+ for (int i = buffer.length(); i < messageSize; i++) {
+ buffer.append(' ');
+ }
+ return buffer.toString();
+ }
+
+ protected void commitTransaction() throws JMSException {
+ session.commit();
+ }
+ }
+
+ public class Consumer {
+
+ private ConnectionFactory factory;
+ private ActiveMQConnection connection;
+ private Session session;
+ private MessageConsumer messageConsumer;
+
+ public Consumer(String brokerURL, String interest, String clientId)
throws JMSException {
+ factory = new ActiveMQConnectionFactory(brokerURL);
+ connection = (ActiveMQConnection) factory.createConnection();
+ connection.setClientID(clientId);
+ connection.start();
+ connection.getPrefetchPolicy().setAll(15);
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createTopic(interest);
+ messageConsumer = session.createDurableSubscriber((Topic)
destination, "MyDurableTopic");
+ }
+
+ public void deleteAllMessages() throws JMSException {
+ while (getMessage(500) != null) {
+ // empty queue
+ }
+ }
+
+ public Message getMessage(int timeout) throws JMSException {
+ return messageConsumer.receive(timeout);
+ }
+
+ public void close() throws JMSException {
+ if (messageConsumer != null) {
+ messageConsumer.close();
+ }
+ if (session != null) {
+ session.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ public Session getSession() {
+ return session;
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native