Author: rgodfrey
Date: Wed Jan 11 13:10:36 2012
New Revision: 1229996

URL: http://svn.apache.org/viewvc?rev=1229996&view=rev
Log:
QPID-3720 : Add alternative (C++ style) grouping and apply comments from Robbie 
Gemmel

Added:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
      - copied, changed from r1229096, 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1229996&r1=1229995&r2=1229996&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Wed Jan 11 13:10:36 2012
@@ -42,9 +42,7 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.MessageGroupManager;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.subscription.*;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -67,10 +65,15 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
+public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, 
MessageGroupManager.SubscriptionResetHelper
 {
     private static final Logger _logger = 
Logger.getLogger(SimpleAMQQueue.class);
     private static final String QPID_GROUP_HEADER_KEY = 
"qpid.group_header_key";
+    private static final String QPID_SHARED_MSG_GROUP = 
"qpid.shared_msg_group";
+    private static final String QPID_DEFAULT_MESSAGE_GROUP = 
"qpid.default-message-group";
+    private static final String QPID_NO_GROUP = "qpid.no-group";
+    // TODO - should make this configurable at the vhost / broker level
+    private static final int DEFAULT_MAX_GROUPS = 255;
 
 
     private final VirtualHost _virtualHost;
@@ -265,7 +268,18 @@ public class SimpleAMQQueue implements A
 
         if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
         {
-            _messageGroupManager = new 
MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255);
+            if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && 
String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
+            {
+                String defaultGroup = 
String.valueOf(arguments.get(QPID_DEFAULT_MESSAGE_GROUP));
+                _messageGroupManager =
+                        new 
DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
+                                defaultGroup == null ? QPID_NO_GROUP : 
defaultGroup,
+                                this);
+            }
+            else
+            {
+                _messageGroupManager = new 
AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
 DEFAULT_MAX_GROUPS);
+            }
         }
         else
         {
@@ -488,28 +502,7 @@ public class SimpleAMQQueue implements A
 
             if(_messageGroupManager != null)
             {
-                QueueEntry entry = 
_messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
-                _messageGroupManager.clearAssignments(subscription);
-                
-                if(entry != null)
-                {
-                    SubscriptionList.SubscriptionNodeIterator subscriberIter = 
_subscriptionList.iterator();
-                    // iterate over all the subscribers, and if they are in 
advance of this queue entry then move them backwards
-                    while (subscriberIter.advance())
-                    {
-                        Subscription sub = 
subscriberIter.getNode().getSubscription();
-
-                        // we don't make browsers send the same stuff twice
-                        if (sub.seesRequeues())
-                        {
-                            updateSubRequeueEntry(sub, entry);
-                        }
-                    }
-
-                    deliverAsync();
-
-                }
-                
+                resetSubPointersForGroups(subscription, true);
             }
 
             // auto-delete queues must be deleted if there are no remaining 
subscribers
@@ -531,6 +524,34 @@ public class SimpleAMQQueue implements A
 
     }
 
+    public void resetSubPointersForGroups(Subscription subscription, boolean 
clearAssignments)
+    {
+        QueueEntry entry = 
_messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+        if(clearAssignments)
+        {
+            _messageGroupManager.clearAssignments(subscription);
+        }
+
+        if(entry != null)
+        {
+            SubscriptionList.SubscriptionNodeIterator subscriberIter = 
_subscriptionList.iterator();
+            // iterate over all the subscribers, and if they are in advance of 
this queue entry then move them backwards
+            while (subscriberIter.advance())
+            {
+                Subscription sub = subscriberIter.getNode().getSubscription();
+
+                // we don't make browsers send the same stuff twice
+                if (sub.seesRequeues())
+                {
+                    updateSubRequeueEntry(sub, entry);
+                }
+            }
+
+            deliverAsync();
+
+        }
+    }
+
     public boolean getDeleteOnNoConsumers()
     {
         return _deleteOnNoConsumers;
@@ -1855,6 +1876,19 @@ public class SimpleAMQQueue implements A
         }
     }
 
+    public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription 
sub)
+    {
+        QueueContext context = (QueueContext) sub.getQueueContext();
+        if(context != null)
+        {
+            QueueEntry releasedNode = context._releasedEntry;
+            return releasedNode == null || releasedNode.compareTo(entry) < 0;
+        }
+        else
+        {
+            return false;
+        }
+    }
 
     /**
      * Used by queue Runners to asynchronously deliver messages to consumers.

Copied: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
 (from r1229096, 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java&r1=1229096&r2=1229996&rev=1229996&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
 Wed Jan 11 13:10:36 2012
@@ -29,16 +29,16 @@ import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 
 
-public class MessageGroupManager
+public class AssignedSubscriptionMessageGroupManager implements 
MessageGroupManager
 {
-    private static final Logger _logger = 
LoggerFactory.getLogger(MessageGroupManager.class);
+    private static final Logger _logger = 
LoggerFactory.getLogger(AssignedSubscriptionMessageGroupManager.class);
 
 
     private final String _groupId;
     private final ConcurrentHashMap<Integer, Subscription> _groupMap = new 
ConcurrentHashMap<Integer, Subscription>();
     private final int _groupMask;
 
-    public MessageGroupManager(final String groupId, final int maxGroups)
+    public AssignedSubscriptionMessageGroupManager(final String groupId, final 
int maxGroups)
     {
         _groupId = groupId;
         _groupMask = pow2(maxGroups)-1;

Added: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1229996&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
 (added)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
 Wed Jan 11 13:10:36 2012
@@ -0,0 +1,270 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefinedGroupMessageGroupManager implements MessageGroupManager
+{
+    private static final Logger _logger = 
LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
+
+    private final String _groupId;
+    private final String _defaultGroup;
+    private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
+    private final SubscriptionResetHelper _resetHelper;
+
+    private final class Group
+    {
+        private final Object _group;
+        private Subscription _subscription;
+        private int _activeCount;
+
+        private Group(final Object key, final Subscription subscription)
+        {
+            _group = key;
+            _subscription = subscription;
+        }
+        
+        public boolean add()
+        {
+            if(_subscription != null)
+            {
+                _activeCount++;
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+        
+        public void subtract()
+        {
+            if(--_activeCount == 0)
+            {
+                _resetHelper.resetSubPointersForGroups(_subscription, false);
+                _subscription = null;
+                _groupMap.remove(_group);
+            }
+        }
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            Group group = (Group) o;
+
+            return _group.equals(group._group);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return _group.hashCode();
+        }
+
+        public boolean isValid()
+        {
+            return !(_subscription == null || (_activeCount == 0 && 
_subscription.isClosed()));
+        }
+
+        public Subscription getSubscription()
+        {
+            return _subscription;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Group{" +
+                    "_group=" + _group +
+                    ", _subscription=" + _subscription +
+                    ", _activeCount=" + _activeCount +
+                    '}';
+        }
+    }
+
+    public DefinedGroupMessageGroupManager(final String groupId, String 
defaultGroup, SubscriptionResetHelper resetHelper)
+    {
+        _groupId = groupId;
+        _defaultGroup = defaultGroup;
+        _resetHelper = resetHelper;
+    }
+    
+    public synchronized Subscription getAssignedSubscription(final QueueEntry 
entry)
+    {
+        Object groupId = getKey(entry);
+
+        Group group = _groupMap.get(groupId);
+        return group == null || !group.isValid() ? null : 
group.getSubscription();
+    }
+
+    public synchronized boolean acceptMessage(final Subscription sub, final 
QueueEntry entry)
+    {
+        Object groupId = getKey(entry);
+        Group group = _groupMap.get(groupId);
+        
+        if(group == null || !group.isValid())
+        {
+            group = new Group(groupId, sub);
+
+            _groupMap.put(groupId, group);
+
+            // there's a small change that the group became empty between the 
point at which getNextAvailable() was
+            // called on the subscription, and when accept message is 
called... in that case we want to avoid delivering
+            // out of order
+            if(_resetHelper.isEntryAheadOfSubscription(entry, sub))
+            {
+                return false;
+            }
+
+        }
+        
+        Subscription assignedSub = group.getSubscription();
+        
+        if(assignedSub == sub)
+        {
+            entry.addStateChangeListener(new GroupStateChangeListener(group, 
entry));
+            return true;
+        }
+        else
+        {
+            return false;            
+        }
+    }
+    
+    
+    public synchronized QueueEntry findEarliestAssignedAvailableEntry(final 
Subscription sub)
+    {
+        EntryFinder visitor = new EntryFinder(sub);
+        sub.getQueue().visit(visitor);
+        _logger.debug("Earliest available entry for " + sub + " is " + 
visitor.getEntry() + (visitor.getEntry() == null ? "" : " : " + 
getKey(visitor.getEntry())));
+        return visitor.getEntry();
+    }
+
+    private class EntryFinder implements AMQQueue.Visitor
+    {
+        private QueueEntry _entry;
+        private Subscription _sub;
+
+        public EntryFinder(final Subscription sub)
+        {
+            _sub = sub;
+        }
+
+        public boolean visit(final QueueEntry entry)
+        {
+            if(!entry.isAvailable())
+                return false;
+
+            Object groupId = getKey(entry);
+
+            Group group = _groupMap.get(groupId);
+            if(group != null && group.getSubscription() == _sub)
+            {
+                _entry = entry;
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+
+        public QueueEntry getEntry()
+        {
+            return _entry;
+        }
+    }
+
+    
+    public void clearAssignments(final Subscription sub)
+    {
+    }
+    
+    private Object getKey(QueueEntry entry)
+    {
+        ServerMessage message = entry.getMessage();
+        AMQMessageHeader messageHeader = message == null ? null : 
message.getMessageHeader();
+        Object groupVal = messageHeader == null ? _defaultGroup : 
messageHeader.getHeader(_groupId);
+        if(groupVal == null)
+        {
+            groupVal = _defaultGroup;
+        }
+        return groupVal;
+    }
+
+    private class GroupStateChangeListener implements 
QueueEntry.StateChangeListener
+    {
+        private final Group _group;
+
+        public GroupStateChangeListener(final Group group,
+                                        final QueueEntry entry)
+        {
+            _group = group;
+        }
+
+        public void stateChanged(final QueueEntry entry,
+                                 final QueueEntry.State oldState,
+                                 final QueueEntry.State newState)
+        {
+            synchronized (DefinedGroupMessageGroupManager.this)
+            {
+                if(_group.isValid())
+                {
+                    if(oldState != newState)
+                    {
+                        if(newState == QueueEntry.State.ACQUIRED)
+                        {
+                            _logger.debug("Adding to " + _group);
+                            _group.add();
+                        }
+                        else if(oldState == QueueEntry.State.ACQUIRED)
+                        {
+                            _logger.debug("Subtracting from " + _group);
+                            _group.subtract();
+                        }
+                    }
+                }
+                else
+                {
+                    entry.removeStateChangeListener(this);
+                }
+            }
+        }
+    }
+}

Added: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java?rev=1229996&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
 (added)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
 Wed Jan 11 13:10:36 2012
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.QueueEntry;
+
+public interface MessageGroupManager
+{
+    public interface SubscriptionResetHelper
+    {
+        public void resetSubPointersForGroups(Subscription subscription, 
boolean clearAssignments);
+
+        boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub);
+    }
+
+    Subscription getAssignedSubscription(QueueEntry entry);
+
+    boolean acceptMessage(Subscription sub, QueueEntry entry);
+
+    QueueEntry findEarliestAssignedAvailableEntry(Subscription sub);
+
+    void clearAssignments(Subscription sub);
+}

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java?rev=1229996&r1=1229995&r2=1229996&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
 Wed Jan 11 13:10:36 2012
@@ -34,18 +34,13 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.naming.NamingException;
 import java.util.HashMap;
 import java.util.Map;
 
 public class MessageGroupQueueTest extends QpidBrokerTestCase
 {
-    private static final int TIMEOUT = 1500;
-
     protected final String QUEUE = "MessageGroupQueue";
 
-    private static final int MSG_COUNT = 50;
-
     private Connection producerConnection;
     private MessageProducer producer;
     private Session producerSession;
@@ -73,38 +68,53 @@ public class MessageGroupQueueTest exten
         super.tearDown();
     }
 
+
+    public void testSimpleGroupAssignment() throws Exception
+    {
+        simpleGroupAssignment(false);
+    }
+
+    public void testSharedGroupSimpleGroupAssignment() throws Exception
+    {
+        simpleGroupAssignment(true);
+    }
+
+
     /**
      * Pre populate the queue with messages with groups as follows
-     * 
+     *
      *  ONE
      *  TWO
      *  ONE
      *  TWO
-     *  
+     *
      *  Create two consumers with prefetch of 1, the first consumer should 
then be assigned group ONE, the second
      *  consumer assigned group TWO if they are started in sequence.
-     *  
+     *
      *  Thus doing
-     *  
+     *
      *  c1 <--- (ONE)
      *  c2 <--- (TWO)
      *  c2 ack --->
-     *  
+     *
      *  c2 should now be able to receive a second message from group TWO 
(skipping over the message from group ONE)
-     *  
+     *
      *  i.e.
-     *  
+     *
      *  c2 <--- (TWO)
      *  c2 ack --->
      *  c1 <--- (ONE)
      *  c1 ack --->
-     *  
+     *
      */
-    public void testSimpleGroupAssignment() throws Exception
+    private void simpleGroupAssignment(boolean sharedGroups) throws 
AMQException, JMSException
     {
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("qpid.group_header_key","group");
-        arguments.put("qpid.shared_msg_group","1");
+        if(sharedGroups)
+        {
+            arguments.put("qpid.shared_msg_group","1");
+        }
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), 
true, false, false, arguments);
         queue = (Queue) 
producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
 
@@ -112,7 +122,7 @@ public class MessageGroupQueueTest exten
         producer = producerSession.createProducer(queue);
 
         String[] groups = { "ONE", "TWO"};
-        
+
         for (int msg = 0; msg < 4; msg++)
         {
             producer.send(createMessage(msg, groups[msg % groups.length]));
@@ -125,7 +135,7 @@ public class MessageGroupQueueTest exten
         Session cs1 = ((AMQConnection)consumerConnection).createSession(false, 
Session.CLIENT_ACKNOWLEDGE,1);
         Session cs2 = ((AMQConnection)consumerConnection).createSession(false, 
Session.CLIENT_ACKNOWLEDGE,1);
 
-        
+
         MessageConsumer consumer1 = cs1.createConsumer(queue);
         MessageConsumer consumer2 = cs2.createConsumer(queue);
 
@@ -154,33 +164,47 @@ public class MessageGroupQueueTest exten
 
         cs1Received2.acknowledge();
         cs2Received2.acknowledge();
-        
+
         assertNull(consumer1.receive(1000));
         assertNull(consumer2.receive(1000));
     }
 
+
+    public void testConsumerCloseGroupAssignment() throws Exception
+    {
+        consumerCloseGroupAssignment(false);
+    }
+
+    public void testSharedGroupConsumerCloseGroupAssignment() throws Exception
+    {
+        consumerCloseGroupAssignment(true);
+    }
+
     /**
-     * 
+     *
      * Tests that upon closing a consumer, groups previously assigned to that 
consumer are reassigned to a different
      * consumer.
-     * 
+     *
      * Pre-populate the queue as ONE, ONE, TWO, ONE
-     * 
+     *
      * create in sequence two consumers
-     * 
+     *
      * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
-     * 
+     *
      * Then close c1 before acking.
-     * 
+     *
      * If we now attempt to receive from c2, then the remaining messages in 
group ONE should be available (which
      * requires c2 to go "backwards" in the queue).
-     * 
-     * */
-    public void testConsumerCloseGroupAssignment() throws Exception
+     *
+     **/
+    private void consumerCloseGroupAssignment(boolean sharedGroups) throws 
AMQException, JMSException
     {
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("qpid.group_header_key","group");
-        arguments.put("qpid.shared_msg_group","1");
+        if(sharedGroups)
+        {
+            arguments.put("qpid.shared_msg_group","1");
+        }
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), 
true, false, false, arguments);
         queue = (Queue) 
producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
 
@@ -197,9 +221,8 @@ public class MessageGroupQueueTest exten
         producerSession.close();
         producerConnection.close();
 
-        Session cs1 = ((AMQConnection)consumerConnection).createSession(false, 
Session.CLIENT_ACKNOWLEDGE,1);
-        Session cs2 = ((AMQConnection)consumerConnection).createSession(false, 
Session.CLIENT_ACKNOWLEDGE,1);
-
+        Session cs1 = ((AMQConnection)consumerConnection).createSession(true, 
Session.SESSION_TRANSACTED,1);
+        Session cs2 = ((AMQConnection)consumerConnection).createSession(true, 
Session.SESSION_TRANSACTED,1);
 
         MessageConsumer consumer1 = cs1.createConsumer(queue);
 
@@ -208,40 +231,54 @@ public class MessageGroupQueueTest exten
 
         Message cs1Received = consumer1.receive(1000);
         assertNotNull("Consumer 1 should have received first message", 
cs1Received);
+        assertEquals("incorrect message received", 1, 
cs1Received.getIntProperty("msg"));
 
         Message cs2Received = consumer2.receive(1000);
 
         assertNotNull("Consumer 2 should have received first message", 
cs2Received);
-        cs2Received.acknowledge();
+        assertEquals("incorrect message received", 3, 
cs2Received.getIntProperty("msg"));
+        cs2.commit();
 
         Message cs2Received2 = consumer2.receive(1000);
 
-        assertNull("Consumer 2 should not have received second message", 
cs2Received2);
+        assertNull("Consumer 2 should not yet have received a second message", 
cs2Received2);
 
         consumer1.close();
 
-        cs1Received.acknowledge();
+        cs1.commit();
         Message cs2Received3 = consumer2.receive(1000);
 
         assertNotNull("Consumer 2 should have received second message", 
cs2Received3);
-        assertEquals("Unexpected group", 
cs2Received3.getStringProperty("group"),
-                     "ONE");
+        assertEquals("Unexpected group", "ONE", 
cs2Received3.getStringProperty("group"));
+        assertEquals("incorrect message received", 2, 
cs2Received3.getIntProperty("msg"));
 
-        cs2Received3.acknowledge();
+        cs2.commit();
 
 
         Message cs2Received4 = consumer2.receive(1000);
 
         assertNotNull("Consumer 2 should have received third message", 
cs2Received4);
-        assertEquals("Unexpected group", 
cs2Received4.getStringProperty("group"),
-                     "ONE");
-
-        cs2Received4.acknowledge();
+        assertEquals("Unexpected group", "ONE", 
cs2Received4.getStringProperty("group"));
+        assertEquals("incorrect message received", 4, 
cs2Received4.getIntProperty("msg"));
+        cs2.commit();
 
         assertNull(consumer2.receive(1000));
     }
 
 
+
+    
+    public void testConsumerCloseWithRelease() throws Exception
+    {
+        consumerCloseWithRelease(false);
+    }
+
+    public void testSharedGroupConsumerCloseWithRelease() throws Exception
+    {
+        consumerCloseWithRelease(true);
+    }
+
+
     /**
      *
      * Tests that upon closing a consumer and its session, groups previously 
assigned to that consumer are reassigned
@@ -259,12 +296,14 @@ public class MessageGroupQueueTest exten
      * requires c2 to go "backwards" in the queue). The first such message 
should be marked as redelivered
      *
      */
-    
-    public void testConsumerCloseWithRelease() throws Exception
+    private void consumerCloseWithRelease(boolean sharedGroups) throws 
AMQException, JMSException
     {
         final Map<String,Object> arguments = new HashMap<String, Object>();
         arguments.put("qpid.group_header_key","group");
-        arguments.put("qpid.shared_msg_group","1");
+        if(sharedGroups)
+        {
+            arguments.put("qpid.shared_msg_group","1");
+        }
 
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), 
true, false, false, arguments);
         queue = (Queue) 
producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
@@ -282,61 +321,155 @@ public class MessageGroupQueueTest exten
         producerSession.close();
         producerConnection.close();
 
-        Session cs1 = ((AMQConnection)consumerConnection).createSession(false, 
Session.CLIENT_ACKNOWLEDGE,1);
-        Session cs2 = ((AMQConnection)consumerConnection).createSession(false, 
Session.CLIENT_ACKNOWLEDGE,1);
+        Session cs1 = ((AMQConnection)consumerConnection).createSession(true, 
Session.SESSION_TRANSACTED,1);
+        Session cs2 = ((AMQConnection)consumerConnection).createSession(true, 
Session.SESSION_TRANSACTED,1);
 
 
         MessageConsumer consumer1 = cs1.createConsumer(queue);
-        MessageConsumer consumer2 = cs2.createConsumer(queue);
 
         consumerConnection.start();
+
+        MessageConsumer consumer2 = cs2.createConsumer(queue);
+
         Message cs1Received = consumer1.receive(1000);
-        assertNotNull("Consumer 1 should have received first message", 
cs1Received);
+        assertNotNull("Consumer 1 should have received its first message", 
cs1Received);
+        assertEquals("incorrect message received", 1, 
cs1Received.getIntProperty("msg"));
 
         Message received = consumer2.receive(1000);
 
-        assertNotNull("Consumer 2 should have received first message", 
received);
-        Message first = received;
+        assertNotNull("Consumer 2 should have received its first message", 
received);
+        assertEquals("incorrect message received", 3, 
received.getIntProperty("msg"));
 
         received = consumer2.receive(1000);
 
-        assertNull("Consumer 2 should not have received second message", 
received);
+        assertNull("Consumer 2 should not yet have received second message", 
received);
 
         consumer1.close();
         cs1.close();
-        first.acknowledge();
+        cs2.commit();
         received = consumer2.receive(1000);
 
-        assertNotNull("Consumer 2 should have received second message", 
received);
-        assertEquals("Unexpected group", received.getStringProperty("group"),
-                     "ONE");
+        assertNotNull("Consumer 2 should now have received second message", 
received);
+        assertEquals("Unexpected group", "ONE", 
received.getStringProperty("group"));
+        assertEquals("incorrect message received", 1, 
received.getIntProperty("msg"));
         assertTrue("Expected second message to be marked as redelivered " + 
received.getIntProperty("msg"),
                    received.getJMSRedelivered());
 
-        received.acknowledge();
+        cs2.commit();
 
 
         received = consumer2.receive(1000);
 
-        assertNotNull("Consumer 2 should have received third message", 
received);
-        assertEquals("Unexpected group", received.getStringProperty("group"),
-                     "ONE");
+        assertNotNull("Consumer 2 should have received a third message", 
received);
+        assertEquals("Unexpected group", "ONE", 
received.getStringProperty("group"));
+        assertEquals("incorrect message received", 2, 
received.getIntProperty("msg"));
 
-        received.acknowledge();
+        cs2.commit();
 
         received = consumer2.receive(1000);
 
-        assertNotNull("Consumer 2 should have received fourth message", 
received);
-        assertEquals("Unexpected group", received.getStringProperty("group"),
-                     "ONE");
+        assertNotNull("Consumer 2 should have received a fourth message", 
received);
+        assertEquals("Unexpected group", "ONE", 
received.getStringProperty("group"));
+        assertEquals("incorrect message received", 4, 
received.getIntProperty("msg"));
 
-        received.acknowledge();
+        cs2.commit();
 
 
         assertNull(consumer2.receive(1000));
     }
 
-    
+    public void testGroupAssignmentSurvivesEmpty() throws JMSException, 
AMQException
+    {
+        groupAssignmentOnEmpty(false);
+    }
+
+    public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws 
JMSException, AMQException
+    {
+        groupAssignmentOnEmpty(true);
+    }
+
+    private void groupAssignmentOnEmpty(boolean sharedGroups) throws 
AMQException, JMSException
+    {
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("qpid.group_header_key","group");
+        if(sharedGroups)
+        {
+            arguments.put("qpid.shared_msg_group","1");
+        }
+
+        ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), 
true, false, false, arguments);
+        queue = (Queue) 
producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        producer = producerSession.createProducer(queue);
+
+        producer.send(createMessage(1, "ONE"));
+        producer.send(createMessage(2, "TWO"));
+        producer.send(createMessage(3, "THREE"));
+        producer.send(createMessage(4, "ONE"));
+
+        producerSession.commit();
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+        Session cs1 = ((AMQConnection)consumerConnection).createSession(true, 
Session.SESSION_TRANSACTED,1);
+        Session cs2 = ((AMQConnection)consumerConnection).createSession(true, 
Session.SESSION_TRANSACTED,1);
+
+
+        MessageConsumer consumer1 = cs1.createConsumer(queue);
+
+        consumerConnection.start();
+
+        MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+        Message received = consumer1.receive(1000);
+        assertNotNull("Consumer 1 should have received its first message", 
received);
+        assertEquals("incorrect message received", 1, 
received.getIntProperty("msg"));
+
+        received = consumer2.receive(1000);
+
+        assertNotNull("Consumer 2 should have received its first message", 
received);
+        assertEquals("incorrect message received", 2, 
received.getIntProperty("msg"));
+
+        cs1.commit();
+
+        received = consumer1.receive(1000);
+        assertNotNull("Consumer 1 should have received its second message", 
received);
+        assertEquals("incorrect message received", 3, 
received.getIntProperty("msg"));
+
+        // We expect different behaviours from "shared groups": here the 
assignment of a subscription to a group
+        // is terminated when there are no outstanding delivered but 
unacknowledged messages.  In contrast, with a
+        // standard message grouping queue the assignment will be retained 
until the subscription is no longer
+        // registered
+        if(sharedGroups)
+        {
+            cs2.commit();
+            received = consumer2.receive(1000);
+
+            assertNotNull("Consumer 2 should have received its second 
message", received);
+            assertEquals("incorrect message received", 4, 
received.getIntProperty("msg"));
+
+            cs2.commit();
+        }
+        else
+        {
+            cs2.commit();
+            received = consumer2.receive(1000);
+
+            assertNull("Consumer 2 should not have received a second message", 
received);
+
+            cs1.commit();
+
+            received = consumer1.receive(1000);
+            assertNotNull("Consumer 1 should have received its third message", 
received);
+            assertEquals("incorrect message received", 4, 
received.getIntProperty("msg"));
+
+        }
+
+    }
+
+
     private Message createMessage(int msg, String group) throws JMSException
     {
         Message send = producerSession.createTextMessage("Message: " + msg);

Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1229996&r1=1229995&r2=1229996&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Wed Jan 11 13:10:36 2012
@@ -172,3 +172,9 @@ org.apache.qpid.server.management.AMQUse
 // QPID-3133: On 0-10, the exception listener is currently not invoked when 
reconnection fails to occurs.
 org.apache.qpid.server.failover.FailoverMethodTest#*
 
+// CPP Broker does not implement non-"shared group" message groups
+org.apache.qpid.server.queue.MessageGroupQueueTest#testSimpleGroupAssignment
+org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseGroupAssignment
+org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseWithRelease
+org.apache.qpid.server.queue.MessageGroupQueueTest#testGroupAssignmentSurvivesEmpty
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to