Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
 Fri Feb 13 11:24:44 2009
@@ -25,10 +25,12 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
 import junit.framework.AssertionFailedError;
 
 public class AMQPriorityQueueTest extends SimpleAMQQueueTest
 {
+    private static final long MESSAGE_SIZE = 100L;
 
     @Override
     protected void setUp() throws Exception
@@ -92,11 +94,18 @@
 
     protected AMQMessage createMessage(Long id, byte i) throws AMQException
     {
-        AMQMessage msg = super.createMessage(id);
+        AMQMessage message = super.createMessage(id);
+
+        ContentHeaderBody header = new ContentHeaderBody();
+        header.bodySize = MESSAGE_SIZE;
+
+        //The createMessage above is for a Transient Message so it is safe to 
have no context.
+        message.setPublishAndContentHeaderBody(null, info, header);
+
         BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
         props.setPriority(i);
-        msg.getContentHeaderBody().properties = props;
-        return msg;
+        message.getContentHeaderBody().properties = props;
+        return message;
     }
     
     protected AMQMessage createMessage(Long id) throws AMQException

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Fri Feb 13 11:24:44 2009
@@ -136,6 +136,7 @@
         while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
         {
             sendMessages(1, MAX_MESSAGE_SIZE);
+            System.err.println(_queue.getQueueDepth() + ":" + MAX_QUEUE_DEPTH);
         }
 
         Notification lastNotification = _queueMBean.getLastNotification();
@@ -307,7 +308,7 @@
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
             messages[i].enqueue(qs);
-            messages[i].routingComplete(_messageStore, new 
MessageHandleFactory());
+            messages[i].routingComplete(_messageStore, new MessageFactory());
 
         }
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Fri Feb 13 11:24:44 2009
@@ -221,7 +221,7 @@
         ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
         qs.add(_queue);
         msg.enqueue(qs);
-        msg.routingComplete(_messageStore, new MessageHandleFactory());
+        msg.routingComplete(_messageStore, new MessageFactory());
 
         msg.addContentBodyFrame(new ContentChunk()
         {
@@ -305,7 +305,7 @@
             currentMessage.enqueue(qs);
 
             // route header
-            currentMessage.routingComplete(_messageStore, new 
MessageHandleFactory());
+            currentMessage.routingComplete(_messageStore, new 
MessageFactory());
 
             // Add the body so we have somthing to test later
             currentMessage.addContentBodyFrame(

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
 Fri Feb 13 11:24:44 2009
@@ -98,7 +98,7 @@
                                                                       new 
LinkedList<RequiredDeliveryException>()
         );
         _queue.registerSubscription(_subscription,false);
-        MessageHandleFactory factory = new MessageHandleFactory();
+        MessageFactory factory = new MessageFactory();
         for (int i = 1; i <= count; i++)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)

Copied: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
 (from r744076, 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java&r1=744076&r2=744079&rev=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
 Fri Feb 13 11:24:44 2009
@@ -20,29 +20,29 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.store.MemoryMessageStore;
+import junit.framework.TestCase;
 
-public class WeakMessageHandleTest extends InMemoryMessageHandleTest
+public class MessageFactoryTest extends TestCase
 {
-    private MemoryMessageStore _messageStore;
+    private MessageFactory _factory;
 
     public void setUp()
     {
-        _messageStore = new MemoryMessageStore();
-        _messageStore.configure();
+        _factory = new MessageFactory();
     }
 
-    protected AMQMessageHandle newHandle(Long id)
+    public void testTransientMessageCreation()
     {
-        return new WeakReferenceMessageHandle(id, _messageStore);
+        AMQMessage message = _factory.createMessage(0L, null, false);
+
+        assertEquals("Transient Message creation does not return correct 
class.", TransientAMQMessage.class, message.getClass());
     }
 
-    @Override
-    public void testIsPersistent()
+    public void testPersistentMessageCreation()
     {
-        _handle = newHandle(1L);        
-        assertTrue(_handle.isPersistent());
-    }
+        AMQMessage message = _factory.createMessage(0L, null, true);
 
+        assertEquals("Transient Message creation does not return correct 
class.", PersistentAMQMessage.class, message.getClass());
+    }
 
-}
+}
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
 Fri Feb 13 11:24:44 2009
@@ -22,23 +22,13 @@
 
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 
-public class MockAMQMessage extends AMQMessage
+public class MockAMQMessage extends TransientAMQMessage
 {
     public MockAMQMessage(long messageId)
             throws AMQException
     {
-       super(new MockAMQMessageHandle(messageId) ,
-                (StoreContext)null,
-                (MessagePublishInfo)new MessagePublishInfoImpl());
-    }
-
-    protected MockAMQMessage(AMQMessage msg)
-            throws AMQException
-    {
-        super(msg);
+       super(messageId);
     }
 
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
 Fri Feb 13 11:24:44 2009
@@ -27,19 +27,16 @@
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.AMQException;
 import org.apache.commons.configuration.Configuration;
 
 import java.util.List;
 import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedList;
 
 public class MockAMQQueue implements AMQQueue
 {
     private boolean _deleted = false;
+    private int _queueCount;
     private AMQShortString _name;
 
     public MockAMQQueue(String name)
@@ -47,11 +44,6 @@
        _name = new AMQShortString(name);
     }
 
-    public MockAMQQueue()
-    {
-       
-    }
-
     public AMQShortString getName()
     {
         return _name;
@@ -134,7 +126,7 @@
 
     public long getQueueDepth()
     {
-        return 0;  //To change body of implemented methods use File | Settings 
| File Templates.
+        return _queueCount;
     }
 
     public long getReceivedMessageCount()
@@ -159,6 +151,7 @@
 
     public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) 
throws AMQException
     {
+        _queueCount++;
         return null;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
 
@@ -169,7 +162,7 @@
 
     public void dequeue(StoreContext storeContext, QueueEntry entry) throws 
FailedDequeueException
     {
-        //To change body of implemented methods use File | Settings | File 
Templates.
+        _queueCount--;
     }
 
     public boolean resend(QueueEntry entry, Subscription subscription) throws 
AMQException

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
 Fri Feb 13 11:24:44 2009
@@ -20,14 +20,32 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.FixedSizeByteBufferAllocator;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 
 public class MockContentChunk implements ContentChunk
 {
+    public static final int DEFAULT_SIZE=0;
+
     private ByteBuffer _bytebuffer;
     private int _size;
 
+
+
+    public MockContentChunk()
+    {
+        this(0);
+    }
+
+    public MockContentChunk(int size)
+    {
+        FixedSizeByteBufferAllocator allocator = new 
FixedSizeByteBufferAllocator();
+        _bytebuffer = allocator.allocate(size, false);
+
+        _size = size;
+    }
+
     public MockContentChunk(ByteBuffer bytebuffer, int size)
     {
         _bytebuffer = bytebuffer;

Copied: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
 (from r744076, 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java&r1=744076&r2=744079&rev=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
 Fri Feb 13 11:24:44 2009
@@ -21,8 +21,9 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
 
-public class WeakMessageHandleTest extends InMemoryMessageHandleTest
+public class PersistentMessageTest extends TransientMessageTest
 {
     private MemoryMessageStore _messageStore;
 
@@ -30,19 +31,20 @@
     {
         _messageStore = new MemoryMessageStore();
         _messageStore.configure();
+        _storeContext = new StoreContext();
     }
 
-    protected AMQMessageHandle newHandle(Long id)
+    @Override
+    protected AMQMessage newMessage(Long id)
     {
-        return new WeakReferenceMessageHandle(id, _messageStore);
+        return new MessageFactory().createMessage(id, _messageStore, true);
     }
 
     @Override
     public void testIsPersistent()
     {
-        _handle = newHandle(1L);        
-        assertTrue(_handle.isPersistent());
+        _message = newMessage(1L);
+        assertTrue(_message.isPersistent());
     }
 
-
 }

Copied: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
 (from r744076, 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java&r1=744076&r2=744079&rev=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
 Fri Feb 13 11:24:44 2009
@@ -20,28 +20,29 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.store.MemoryMessageStore;
+import junit.framework.TestCase;
 
-public class WeakMessageHandleTest extends InMemoryMessageHandleTest
+public class QueueEntryImplTest extends TestCase
 {
-    private MemoryMessageStore _messageStore;
 
-    public void setUp()
+    /**
+     * Test the Redelivered state of a QueueEntryImpl
+     */
+    public void testRedelivered()
     {
-        _messageStore = new MemoryMessageStore();
-        _messageStore.configure();
-    }
+        QueueEntry entry = new QueueEntryImpl(null, null);
 
-    protected AMQMessageHandle newHandle(Long id)
-    {
-        return new WeakReferenceMessageHandle(id, _messageStore);
-    }
+        assertFalse("New message should not be redelivered", 
entry.isRedelivered());
+
+        entry.setRedelivered(true);
+
+        assertTrue("New message should not be redelivered", 
entry.isRedelivered());
+
+        //Check we can revert it.. not that we ever should.
+        entry.setRedelivered(false);
+
+        assertFalse("New message should not be redelivered", 
entry.isRedelivered());
 
-    @Override
-    public void testIsPersistent()
-    {
-        _handle = newHandle(1L);        
-        assertTrue(_handle.isPersistent());
     }
 
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 Fri Feb 13 11:24:44 2009
@@ -56,7 +56,8 @@
     protected FieldTable _arguments = null;
     
     MessagePublishInfo info = new MessagePublishInfoImpl();
-    
+    private static final long MESSAGE_SIZE = 100;
+
     @Override
     protected void setUp() throws Exception
     {
@@ -317,7 +318,7 @@
         // Send persistent message
         qs.add(_queue);
         msg.enqueue(qs);
-        msg.routingComplete(_store, new MessageHandleFactory());
+        msg.routingComplete(_store, new MessageFactory());
         _store.storeMessageMetaData(null, new Long(1L), new 
MessageMetaData(info, contentHeaderBody, 1));
         
         // Check that it is enqueued
@@ -326,9 +327,14 @@
         
         // Dequeue message
         MockQueueEntry entry = new MockQueueEntry();
-        AMQMessage amqmsg = new AMQMessage(1L, _store, new 
MessageHandleFactory(), txnContext);
+        AMQMessage message = new MessageFactory().createMessage(1L, _store, 
true);
         
-        entry.setMessage(amqmsg);
+        ContentHeaderBody header = new ContentHeaderBody();
+        header.bodySize = MESSAGE_SIZE;
+        // This is a persist message but we are not in a transaction so create 
a new context for the message
+        message.setPublishAndContentHeaderBody(new StoreContext(), info, 
header);
+        
+        entry.setMessage(message);
         _queue.dequeue(null, entry);
         
         // Check that it is dequeued
@@ -338,22 +344,19 @@
 
 
     // FIXME: move this to somewhere useful
-    private static AMQMessageHandle createMessageHandle(final long messageId, 
final MessagePublishInfo publishBody)
+    private static AMQMessage createMessage(final long messageId, final 
MessagePublishInfo publishBody)
     {
-        final AMQMessageHandle amqMessageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId,
-                                                                               
                    null,
-                                                                               
                    false);
+        final AMQMessage amqMessage = (new 
MessageFactory()).createMessage(messageId, null, false);
         try
         {
-            amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
-                                                              publishBody,
-                                                              new 
ContentHeaderBody()
-            {
-                public int getSize()
-                {
-                    return 1;
-                }
-            });
+            //Safe to use a null StoreContext as we have created a 
TransientMessage (see false param above)
+            amqMessage.setPublishAndContentHeaderBody( null, publishBody, new 
ContentHeaderBody()
+                    {
+                        public int getSize()
+                        {
+                            return 1;
+                        }
+                    });
         }
         catch (AMQException e)
         {
@@ -361,18 +364,18 @@
         }
 
 
-        return amqMessageHandle;
+        return amqMessage;
     }
 
-    public class TestMessage extends AMQMessage
+    public class TestMessage extends TransientAMQMessage
     {
         private final long _tag;
         private int _count;
 
-        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, 
StoreContext storeContext)
+        TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
                 throws AMQException
         {
-            super(createMessageHandle(messageId, publishBody), storeContext, 
publishBody);
+            super(createMessage(messageId, publishBody));
             _tag = tag;
         }
 
@@ -396,7 +399,8 @@
     
     protected AMQMessage createMessage(Long id) throws AMQException
     {
-        AMQMessage messageA = new TestMessage(id, id, info, new 
StoreContext());
+
+        AMQMessage messageA = new TestMessage(id, id, info);
         return messageA;
     }
 }

Added: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java?rev=744079&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
 (added)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
 Fri Feb 13 11:24:44 2009
@@ -0,0 +1,467 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TransientMessageTest extends TestCase
+{
+    AMQMessage _message;
+    StoreContext _storeContext = null;
+
+    protected AMQMessage newMessage(Long id)
+    {
+        return new MessageFactory().createMessage(id, null, false);
+    }
+
+    public void testMessageID()
+    {
+        Long id = 1L;
+        _message = newMessage(id);
+
+        assertEquals("Message not set value", id, _message.getMessageId());
+    }
+
+    public void testInvalidContentChunk()
+    {
+        _message = newMessage(1L);
+
+        try
+        {
+            _message.getContentChunk(0);
+            fail("getContentChunk should not succeed");
+        }
+        catch (RuntimeException e)
+        {
+            assertTrue(e.getMessage().equals("No ContentBody has been set"));
+        }
+
+        ContentChunk cc = new MockContentChunk(100);
+
+        try
+        {
+            _message.addContentBodyFrame(_storeContext, cc, false);
+        }
+        catch (AMQException e)
+        {
+            fail("AMQException thrown:" + e.getMessage());
+        }
+
+        try
+        {
+            _message.getContentChunk(-1);
+            fail("getContentChunk should not succeed");
+        }
+        catch (IllegalArgumentException e)
+        {
+            assertTrue(e.getMessage().contains("out of valid range"));
+        }
+
+        try
+        {
+            _message.getContentChunk(1);
+            fail("getContentChunk should not succeed");
+        }
+        catch (IllegalArgumentException e)
+        {
+            assertTrue(e.getMessage().contains("out of valid range"));
+        }
+    }
+
+    public void testAddSingleContentChunk()
+    {
+
+        _message = newMessage(1L);
+
+        ContentChunk cc = new MockContentChunk(100);
+
+        try
+        {
+            _message.addContentBodyFrame(_storeContext, cc, true);
+        }
+        catch (AMQException e)
+        {
+            fail("AMQException thrown:" + e.getMessage());
+        }
+
+        assertEquals("Incorrect body count", 1, _message.getBodyCount());
+
+        assertEquals("Incorrect ContentChunk returned.", cc, 
_message.getContentChunk(0));
+
+        cc = new MockContentChunk(100);
+
+        try
+        {
+            _message.addContentBodyFrame(_storeContext, cc, true);
+            fail("Exception should prevent adding two final chunks");
+        }
+        catch (UnsupportedOperationException e)
+        {
+            //normal path
+        }
+        catch (AMQException e)
+        {
+            fail("AMQException thrown:" + e.getMessage());
+        }
+
+    }
+
+    public void testAddMultipleContentChunk()
+    {
+
+        _message = newMessage(1L);
+
+        ContentChunk cc = new MockContentChunk(100);
+
+        try
+        {
+            _message.addContentBodyFrame(_storeContext, cc, false);
+        }
+        catch (AMQException e)
+        {
+            fail("AMQException thrown:" + e.getMessage());
+        }
+
+        assertEquals("Incorrect body count", 1, _message.getBodyCount());
+
+        assertEquals("Incorrect ContentChunk returned.", cc, 
_message.getContentChunk(0));
+
+        cc = new MockContentChunk(100);
+
+        try
+        {
+            _message.addContentBodyFrame(_storeContext, cc, true);
+        }
+        catch (AMQException e)
+        {
+            fail("AMQException thrown:" + e.getMessage());
+        }
+
+        assertEquals("Incorrect body count", 2, _message.getBodyCount());
+
+        assertEquals("Incorrect ContentChunk returned.", cc, 
_message.getContentChunk(1));
+
+    }
+
+    public void testInitialArrivalTime()
+    {
+        _message = newMessage(1L);
+
+        assertEquals("Initial Arrival time should be 0L", 0L, 
_message.getArrivalTime());
+    }
+
+    public void testSetPublishAndContentHeaderBody_WithBody()
+    {
+        _message = newMessage(1L);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl();
+        int bodySize = 100;
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, new 
BasicContentHeaderProperties(), bodySize);
+
+        try
+        {
+            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+            assertEquals("BodySize not returned correctly. ", bodySize, 
_message.getSize());
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    public void testSetPublishAndContentHeaderBody_Null()
+    {
+        _message = newMessage(1L);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl();
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        try
+        {
+            _message.setPublishAndContentHeaderBody(_storeContext, mpi, null);
+            fail("setPublishAndContentHeaderBody with null ContentHeaederBody 
did not throw NPE.");
+        }
+        catch (NullPointerException npe)
+        {
+            assertEquals("HeaderBody cannot be null", npe.getMessage());
+        }
+        catch (AMQException e)
+        {
+            fail("setPublishAndContentHeaderBody should not throw AMQException 
here:" + e.getMessage());
+        }
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        try
+        {
+            _message.setPublishAndContentHeaderBody(_storeContext, null, chb);
+            fail("setPublishAndContentHeaderBody with null MessagePublishInfo 
did not throw NPE.");
+        }
+        catch (NullPointerException npe)
+        {
+            assertEquals("PublishInfo cannot be null", npe.getMessage());
+        }
+        catch (AMQException e)
+        {
+            fail("setPublishAndContentHeaderBody should not throw AMQException 
here:" + e.getMessage());
+        }
+    }
+
+    public void testSetPublishAndContentHeaderBody_Empty()
+    {
+        _message = newMessage(1L);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl();
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        try
+        {
+            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+            assertEquals("BodySize not returned correctly. ", bodySize, 
_message.getSize());
+
+            ContentHeaderBody retreived_chb = _message.getContentHeaderBody();
+
+            ContentHeaderProperties chp = retreived_chb.properties;
+
+            assertEquals("ContentHeaderBody not correct", chb, retreived_chb);
+
+            assertEquals("AppID not correctly retreived", "HandleTest",
+                         ((BasicContentHeaderProperties) 
chp).getAppIdAsString());
+
+            MessagePublishInfo retreived_mpi = 
_message.getMessagePublishInfo();
+
+            assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi);
+
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    public void testIsPersistent()
+    {
+        _message = newMessage(1L);
+
+        assertFalse(_message.isPersistent());
+    }
+
+    public void testImmediateAndNotDelivered()
+    {
+        _message = newMessage(1L);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, 
null);
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        try
+        {
+            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+            assertTrue("Undelivered Immediate message should still be marked 
as so", _message.immediateAndNotDelivered());
+
+            assertFalse("Undelivered Message should not say it is delivered.", 
_message.getDeliveredToConsumer());
+
+            _message.setDeliveredToConsumer();
+
+            assertTrue("Delivered Message should say it is delivered.", 
_message.getDeliveredToConsumer());
+
+            assertFalse("Delivered Immediate message now be marked as so", 
_message.immediateAndNotDelivered());
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    public void testNotImmediateAndNotDelivered()
+    {
+        _message = newMessage(1L);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, 
false, null);
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        try
+        {
+            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+            assertFalse("Undelivered Non-Immediate message should not result 
in true.", _message.immediateAndNotDelivered());
+
+            assertFalse("Undelivered Message should not say it is delivered.", 
_message.getDeliveredToConsumer());
+
+            _message.setDeliveredToConsumer();
+
+            assertTrue("Delivered Message should say it is delivered.", 
_message.getDeliveredToConsumer());
+
+            assertFalse("Delivered Non-Immediate message not change this 
return", _message.immediateAndNotDelivered());
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    public void testExpiry()
+    {
+        _message = newMessage(1L);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, 
false, null);
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        ReentrantLock waitLock = new ReentrantLock();
+        Condition wait = waitLock.newCondition();
+        try
+        {
+            _message.setExpiration(System.currentTimeMillis() + 10L);
+
+            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+            assertFalse("New messages should not be expired.", 
_message.expired());
+
+            final long MILLIS =1000000L;
+            long waitTime = 20 * MILLIS;
+
+            while (waitTime > 0)
+            {
+                try
+                {
+                    waitLock.lock();
+
+                    waitTime = wait.awaitNanos(waitTime);
+                }
+                catch (InterruptedException e)
+                {
+                    //Stop if we are interrupted
+                    fail(e.getMessage());
+                }
+                finally
+                {
+                    waitLock.unlock();
+                }
+
+            }
+
+            assertTrue("After a sleep messages should now be expired.", 
_message.expired());
+
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+
+        public void testNoExpiry()
+    {
+        _message = newMessage(1L);
+
+        MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, 
false, null);
+        int bodySize = 0;
+
+        BasicContentHeaderProperties props = new 
BasicContentHeaderProperties();
+
+        props.setAppId("HandleTest");
+
+        ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+        ReentrantLock waitLock = new ReentrantLock();
+        Condition wait = waitLock.newCondition();
+        try
+        {
+
+            _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
+
+            assertFalse("New messages should not be expired.", 
_message.expired());
+
+            final long MILLIS =1000000L;
+            long waitTime = 10 * MILLIS;
+
+            while (waitTime > 0)
+            {
+                try
+                {
+                    waitLock.lock();
+
+                    waitTime = wait.awaitNanos(waitTime);
+                }
+                catch (InterruptedException e)
+                {
+                    //Stop if we are interrupted
+                    fail(e.getMessage());
+                }
+                finally
+                {
+                    waitLock.unlock();
+                }
+
+            }
+
+            assertFalse("After a sleep messages without an expiry should not 
expire.", _message.expired());
+
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+}

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
 Fri Feb 13 11:24:44 2009
@@ -30,7 +30,7 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -389,7 +389,7 @@
 
         try
         {
-            currentMessage.routingComplete(_virtualHost.getMessageStore(), new 
MessageHandleFactory());
+            currentMessage.routingComplete(_virtualHost.getMessageStore(), new 
MessageFactory());
         }
         catch (AMQException e)
         {

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
 Fri Feb 13 11:24:44 2009
@@ -26,9 +26,8 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.queue.MessageFactory;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.AMQMessageHandle;
 
 /**
  * Tests that reference counting works correctly with AMQMessage and the 
message store
@@ -56,10 +55,9 @@
         MessagePublishInfo info = new MessagePublishInfoImpl();
 
         final long messageId = _store.getNewMessageId();
-        AMQMessageHandle messageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId, _store, true);
-        messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
-        AMQMessage message = new AMQMessage(messageHandle,
-                                             _storeContext,info);
+
+        AMQMessage message = (new MessageFactory()).createMessage(messageId, 
_store, true);
+        message.setPublishAndContentHeaderBody(_storeContext, info, chb);
 
         message = message.takeReference();
 
@@ -88,18 +86,10 @@
 
         final Long messageId = _store.getNewMessageId();
         final ContentHeaderBody chb = createPersistentContentHeader();
-        AMQMessageHandle messageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId, _store, true);
-        messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
-        AMQMessage message = new AMQMessage(messageHandle,
-                                             _storeContext,
-                                            info);
-        
+        AMQMessage message = (new MessageFactory()).createMessage(messageId, 
_store, true);
+        message.setPublishAndContentHeaderBody(_storeContext, info, chb);
         
         message = message.takeReference();
-        // we call routing complete to set up the handle
-     //   message.routingComplete(_store, _storeContext, new 
MessageHandleFactory());
-
-
 
         assertEquals(1, _store.getMessageMetaDataMap().size());
         message = message.takeReference();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to