Author: rgodfrey Date: Wed Jan 11 13:10:36 2012 New Revision: 1229996 URL: http://svn.apache.org/viewvc?rev=1229996&view=rev Log: QPID-3720 : Add alternative (C++ style) grouping and apply comments from Robbie Gemmel
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java - copied, changed from r1229096, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java qpid/trunk/qpid/java/test-profiles/CPPExcludes Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1229996&r1=1229995&r2=1229996&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Jan 11 13:10:36 2012 @@ -42,9 +42,7 @@ import org.apache.qpid.server.management import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.MessageGroupManager; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.subscription.*; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -67,10 +65,15 @@ import java.util.concurrent.atomic.Atomi import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener +public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; + private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; + private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group"; + private static final String QPID_NO_GROUP = "qpid.no-group"; + // TODO - should make this configurable at the vhost / broker level + private static final int DEFAULT_MAX_GROUPS = 255; private final VirtualHost _virtualHost; @@ -265,7 +268,18 @@ public class SimpleAMQQueue implements A if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY)) { - _messageGroupManager = new MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255); + if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1")) + { + String defaultGroup = String.valueOf(arguments.get(QPID_DEFAULT_MESSAGE_GROUP)); + _messageGroupManager = + new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), + defaultGroup == null ? QPID_NO_GROUP : defaultGroup, + this); + } + else + { + _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS); + } } else { @@ -488,28 +502,7 @@ public class SimpleAMQQueue implements A if(_messageGroupManager != null) { - QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); - _messageGroupManager.clearAssignments(subscription); - - if(entry != null) - { - SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); - // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while (subscriberIter.advance()) - { - Subscription sub = subscriberIter.getNode().getSubscription(); - - // we don't make browsers send the same stuff twice - if (sub.seesRequeues()) - { - updateSubRequeueEntry(sub, entry); - } - } - - deliverAsync(); - - } - + resetSubPointersForGroups(subscription, true); } // auto-delete queues must be deleted if there are no remaining subscribers @@ -531,6 +524,34 @@ public class SimpleAMQQueue implements A } + public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments) + { + QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); + if(clearAssignments) + { + _messageGroupManager.clearAssignments(subscription); + } + + if(entry != null) + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance()) + { + Subscription sub = subscriberIter.getNode().getSubscription(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues()) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + + } + } + public boolean getDeleteOnNoConsumers() { return _deleteOnNoConsumers; @@ -1855,6 +1876,19 @@ public class SimpleAMQQueue implements A } } + public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub) + { + QueueContext context = (QueueContext) sub.getQueueContext(); + if(context != null) + { + QueueEntry releasedNode = context._releasedEntry; + return releasedNode == null || releasedNode.compareTo(entry) < 0; + } + else + { + return false; + } + } /** * Used by queue Runners to asynchronously deliver messages to consumers. Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java (from r1229096, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java&r1=1229096&r2=1229996&rev=1229996&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java Wed Jan 11 13:10:36 2012 @@ -29,16 +29,16 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; -public class MessageGroupManager +public class AssignedSubscriptionMessageGroupManager implements MessageGroupManager { - private static final Logger _logger = LoggerFactory.getLogger(MessageGroupManager.class); + private static final Logger _logger = LoggerFactory.getLogger(AssignedSubscriptionMessageGroupManager.class); private final String _groupId; private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>(); private final int _groupMask; - public MessageGroupManager(final String groupId, final int maxGroups) + public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups) { _groupId = groupId; _groupMask = pow2(maxGroups)-1; Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1229996&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java (added) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java Wed Jan 11 13:10:36 2012 @@ -0,0 +1,270 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +public class DefinedGroupMessageGroupManager implements MessageGroupManager +{ + private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class); + + private final String _groupId; + private final String _defaultGroup; + private final Map<Object, Group> _groupMap = new HashMap<Object, Group>(); + private final SubscriptionResetHelper _resetHelper; + + private final class Group + { + private final Object _group; + private Subscription _subscription; + private int _activeCount; + + private Group(final Object key, final Subscription subscription) + { + _group = key; + _subscription = subscription; + } + + public boolean add() + { + if(_subscription != null) + { + _activeCount++; + return true; + } + else + { + return false; + } + } + + public void subtract() + { + if(--_activeCount == 0) + { + _resetHelper.resetSubPointersForGroups(_subscription, false); + _subscription = null; + _groupMap.remove(_group); + } + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + Group group = (Group) o; + + return _group.equals(group._group); + } + + @Override + public int hashCode() + { + return _group.hashCode(); + } + + public boolean isValid() + { + return !(_subscription == null || (_activeCount == 0 && _subscription.isClosed())); + } + + public Subscription getSubscription() + { + return _subscription; + } + + @Override + public String toString() + { + return "Group{" + + "_group=" + _group + + ", _subscription=" + _subscription + + ", _activeCount=" + _activeCount + + '}'; + } + } + + public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, SubscriptionResetHelper resetHelper) + { + _groupId = groupId; + _defaultGroup = defaultGroup; + _resetHelper = resetHelper; + } + + public synchronized Subscription getAssignedSubscription(final QueueEntry entry) + { + Object groupId = getKey(entry); + + Group group = _groupMap.get(groupId); + return group == null || !group.isValid() ? null : group.getSubscription(); + } + + public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry) + { + Object groupId = getKey(entry); + Group group = _groupMap.get(groupId); + + if(group == null || !group.isValid()) + { + group = new Group(groupId, sub); + + _groupMap.put(groupId, group); + + // there's a small change that the group became empty between the point at which getNextAvailable() was + // called on the subscription, and when accept message is called... in that case we want to avoid delivering + // out of order + if(_resetHelper.isEntryAheadOfSubscription(entry, sub)) + { + return false; + } + + } + + Subscription assignedSub = group.getSubscription(); + + if(assignedSub == sub) + { + entry.addStateChangeListener(new GroupStateChangeListener(group, entry)); + return true; + } + else + { + return false; + } + } + + + public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub) + { + EntryFinder visitor = new EntryFinder(sub); + sub.getQueue().visit(visitor); + _logger.debug("Earliest available entry for " + sub + " is " + visitor.getEntry() + (visitor.getEntry() == null ? "" : " : " + getKey(visitor.getEntry()))); + return visitor.getEntry(); + } + + private class EntryFinder implements AMQQueue.Visitor + { + private QueueEntry _entry; + private Subscription _sub; + + public EntryFinder(final Subscription sub) + { + _sub = sub; + } + + public boolean visit(final QueueEntry entry) + { + if(!entry.isAvailable()) + return false; + + Object groupId = getKey(entry); + + Group group = _groupMap.get(groupId); + if(group != null && group.getSubscription() == _sub) + { + _entry = entry; + return true; + } + else + { + return false; + } + } + + public QueueEntry getEntry() + { + return _entry; + } + } + + + public void clearAssignments(final Subscription sub) + { + } + + private Object getKey(QueueEntry entry) + { + ServerMessage message = entry.getMessage(); + AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); + Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId); + if(groupVal == null) + { + groupVal = _defaultGroup; + } + return groupVal; + } + + private class GroupStateChangeListener implements QueueEntry.StateChangeListener + { + private final Group _group; + + public GroupStateChangeListener(final Group group, + final QueueEntry entry) + { + _group = group; + } + + public void stateChanged(final QueueEntry entry, + final QueueEntry.State oldState, + final QueueEntry.State newState) + { + synchronized (DefinedGroupMessageGroupManager.this) + { + if(_group.isValid()) + { + if(oldState != newState) + { + if(newState == QueueEntry.State.ACQUIRED) + { + _logger.debug("Adding to " + _group); + _group.add(); + } + else if(oldState == QueueEntry.State.ACQUIRED) + { + _logger.debug("Subtracting from " + _group); + _group.subtract(); + } + } + } + else + { + entry.removeStateChangeListener(this); + } + } + } + } +} Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java?rev=1229996&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java (added) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java Wed Jan 11 13:10:36 2012 @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.server.queue.QueueEntry; + +public interface MessageGroupManager +{ + public interface SubscriptionResetHelper + { + public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments); + + boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub); + } + + Subscription getAssignedSubscription(QueueEntry entry); + + boolean acceptMessage(Subscription sub, QueueEntry entry); + + QueueEntry findEarliestAssignedAvailableEntry(Subscription sub); + + void clearAssignments(Subscription sub); +} Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?rev=1229996&r1=1229995&r2=1229996&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java Wed Jan 11 13:10:36 2012 @@ -34,18 +34,13 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import javax.naming.NamingException; import java.util.HashMap; import java.util.Map; public class MessageGroupQueueTest extends QpidBrokerTestCase { - private static final int TIMEOUT = 1500; - protected final String QUEUE = "MessageGroupQueue"; - private static final int MSG_COUNT = 50; - private Connection producerConnection; private MessageProducer producer; private Session producerSession; @@ -73,38 +68,53 @@ public class MessageGroupQueueTest exten super.tearDown(); } + + public void testSimpleGroupAssignment() throws Exception + { + simpleGroupAssignment(false); + } + + public void testSharedGroupSimpleGroupAssignment() throws Exception + { + simpleGroupAssignment(true); + } + + /** * Pre populate the queue with messages with groups as follows - * + * * ONE * TWO * ONE * TWO - * + * * Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second * consumer assigned group TWO if they are started in sequence. - * + * * Thus doing - * + * * c1 <--- (ONE) * c2 <--- (TWO) * c2 ack ---> - * + * * c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE) - * + * * i.e. - * + * * c2 <--- (TWO) * c2 ack ---> * c1 <--- (ONE) * c1 ack ---> - * + * */ - public void testSimpleGroupAssignment() throws Exception + private void simpleGroupAssignment(boolean sharedGroups) throws AMQException, JMSException { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("qpid.group_header_key","group"); - arguments.put("qpid.shared_msg_group","1"); + if(sharedGroups) + { + arguments.put("qpid.shared_msg_group","1"); + } ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); @@ -112,7 +122,7 @@ public class MessageGroupQueueTest exten producer = producerSession.createProducer(queue); String[] groups = { "ONE", "TWO"}; - + for (int msg = 0; msg < 4; msg++) { producer.send(createMessage(msg, groups[msg % groups.length])); @@ -125,7 +135,7 @@ public class MessageGroupQueueTest exten Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); - + MessageConsumer consumer1 = cs1.createConsumer(queue); MessageConsumer consumer2 = cs2.createConsumer(queue); @@ -154,33 +164,47 @@ public class MessageGroupQueueTest exten cs1Received2.acknowledge(); cs2Received2.acknowledge(); - + assertNull(consumer1.receive(1000)); assertNull(consumer2.receive(1000)); } + + public void testConsumerCloseGroupAssignment() throws Exception + { + consumerCloseGroupAssignment(false); + } + + public void testSharedGroupConsumerCloseGroupAssignment() throws Exception + { + consumerCloseGroupAssignment(true); + } + /** - * + * * Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different * consumer. - * + * * Pre-populate the queue as ONE, ONE, TWO, ONE - * + * * create in sequence two consumers - * + * * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2) - * + * * Then close c1 before acking. - * + * * If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which * requires c2 to go "backwards" in the queue). - * - * */ - public void testConsumerCloseGroupAssignment() throws Exception + * + **/ + private void consumerCloseGroupAssignment(boolean sharedGroups) throws AMQException, JMSException { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("qpid.group_header_key","group"); - arguments.put("qpid.shared_msg_group","1"); + if(sharedGroups) + { + arguments.put("qpid.shared_msg_group","1"); + } ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); @@ -197,9 +221,8 @@ public class MessageGroupQueueTest exten producerSession.close(); producerConnection.close(); - Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); - Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); - + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); MessageConsumer consumer1 = cs1.createConsumer(queue); @@ -208,40 +231,54 @@ public class MessageGroupQueueTest exten Message cs1Received = consumer1.receive(1000); assertNotNull("Consumer 1 should have received first message", cs1Received); + assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg")); Message cs2Received = consumer2.receive(1000); assertNotNull("Consumer 2 should have received first message", cs2Received); - cs2Received.acknowledge(); + assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg")); + cs2.commit(); Message cs2Received2 = consumer2.receive(1000); - assertNull("Consumer 2 should not have received second message", cs2Received2); + assertNull("Consumer 2 should not yet have received a second message", cs2Received2); consumer1.close(); - cs1Received.acknowledge(); + cs1.commit(); Message cs2Received3 = consumer2.receive(1000); assertNotNull("Consumer 2 should have received second message", cs2Received3); - assertEquals("Unexpected group", cs2Received3.getStringProperty("group"), - "ONE"); + assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group")); + assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg")); - cs2Received3.acknowledge(); + cs2.commit(); Message cs2Received4 = consumer2.receive(1000); assertNotNull("Consumer 2 should have received third message", cs2Received4); - assertEquals("Unexpected group", cs2Received4.getStringProperty("group"), - "ONE"); - - cs2Received4.acknowledge(); + assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group")); + assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg")); + cs2.commit(); assertNull(consumer2.receive(1000)); } + + + public void testConsumerCloseWithRelease() throws Exception + { + consumerCloseWithRelease(false); + } + + public void testSharedGroupConsumerCloseWithRelease() throws Exception + { + consumerCloseWithRelease(true); + } + + /** * * Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned @@ -259,12 +296,14 @@ public class MessageGroupQueueTest exten * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered * */ - - public void testConsumerCloseWithRelease() throws Exception + private void consumerCloseWithRelease(boolean sharedGroups) throws AMQException, JMSException { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("qpid.group_header_key","group"); - arguments.put("qpid.shared_msg_group","1"); + if(sharedGroups) + { + arguments.put("qpid.shared_msg_group","1"); + } ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); @@ -282,61 +321,155 @@ public class MessageGroupQueueTest exten producerSession.close(); producerConnection.close(); - Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); - Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); MessageConsumer consumer1 = cs1.createConsumer(queue); - MessageConsumer consumer2 = cs2.createConsumer(queue); consumerConnection.start(); + + MessageConsumer consumer2 = cs2.createConsumer(queue); + Message cs1Received = consumer1.receive(1000); - assertNotNull("Consumer 1 should have received first message", cs1Received); + assertNotNull("Consumer 1 should have received its first message", cs1Received); + assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg")); Message received = consumer2.receive(1000); - assertNotNull("Consumer 2 should have received first message", received); - Message first = received; + assertNotNull("Consumer 2 should have received its first message", received); + assertEquals("incorrect message received", 3, received.getIntProperty("msg")); received = consumer2.receive(1000); - assertNull("Consumer 2 should not have received second message", received); + assertNull("Consumer 2 should not yet have received second message", received); consumer1.close(); cs1.close(); - first.acknowledge(); + cs2.commit(); received = consumer2.receive(1000); - assertNotNull("Consumer 2 should have received second message", received); - assertEquals("Unexpected group", received.getStringProperty("group"), - "ONE"); + assertNotNull("Consumer 2 should now have received second message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 1, received.getIntProperty("msg")); assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"), received.getJMSRedelivered()); - received.acknowledge(); + cs2.commit(); received = consumer2.receive(1000); - assertNotNull("Consumer 2 should have received third message", received); - assertEquals("Unexpected group", received.getStringProperty("group"), - "ONE"); + assertNotNull("Consumer 2 should have received a third message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 2, received.getIntProperty("msg")); - received.acknowledge(); + cs2.commit(); received = consumer2.receive(1000); - assertNotNull("Consumer 2 should have received fourth message", received); - assertEquals("Unexpected group", received.getStringProperty("group"), - "ONE"); + assertNotNull("Consumer 2 should have received a fourth message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); - received.acknowledge(); + cs2.commit(); assertNull(consumer2.receive(1000)); } - + public void testGroupAssignmentSurvivesEmpty() throws JMSException, AMQException + { + groupAssignmentOnEmpty(false); + } + + public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws JMSException, AMQException + { + groupAssignmentOnEmpty(true); + } + + private void groupAssignmentOnEmpty(boolean sharedGroups) throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.group_header_key","group"); + if(sharedGroups) + { + arguments.put("qpid.shared_msg_group","1"); + } + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + producer.send(createMessage(1, "ONE")); + producer.send(createMessage(2, "TWO")); + producer.send(createMessage(3, "THREE")); + producer.send(createMessage(4, "ONE")); + + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + + + MessageConsumer consumer1 = cs1.createConsumer(queue); + + consumerConnection.start(); + + MessageConsumer consumer2 = cs2.createConsumer(queue); + + Message received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its first message", received); + assertEquals("incorrect message received", 1, received.getIntProperty("msg")); + + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received its first message", received); + assertEquals("incorrect message received", 2, received.getIntProperty("msg")); + + cs1.commit(); + + received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its second message", received); + assertEquals("incorrect message received", 3, received.getIntProperty("msg")); + + // We expect different behaviours from "shared groups": here the assignment of a subscription to a group + // is terminated when there are no outstanding delivered but unacknowledged messages. In contrast, with a + // standard message grouping queue the assignment will be retained until the subscription is no longer + // registered + if(sharedGroups) + { + cs2.commit(); + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received its second message", received); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); + + cs2.commit(); + } + else + { + cs2.commit(); + received = consumer2.receive(1000); + + assertNull("Consumer 2 should not have received a second message", received); + + cs1.commit(); + + received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its third message", received); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); + + } + + } + + private Message createMessage(int msg, String group) throws JMSException { Message send = producerSession.createTextMessage("Message: " + msg); Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1229996&r1=1229995&r2=1229996&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original) +++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Wed Jan 11 13:10:36 2012 @@ -172,3 +172,9 @@ org.apache.qpid.server.management.AMQUse // QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs. org.apache.qpid.server.failover.FailoverMethodTest#* +// CPP Broker does not implement non-"shared group" message groups +org.apache.qpid.server.queue.MessageGroupQueueTest#testSimpleGroupAssignment +org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseGroupAssignment +org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseWithRelease +org.apache.qpid.server.queue.MessageGroupQueueTest#testGroupAssignmentSurvivesEmpty + --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org