Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Fri Feb 13 11:24:44 2009 @@ -25,10 +25,12 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ContentHeaderBody; import junit.framework.AssertionFailedError; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { + private static final long MESSAGE_SIZE = 100L; @Override protected void setUp() throws Exception @@ -92,11 +94,18 @@ protected AMQMessage createMessage(Long id, byte i) throws AMQException { - AMQMessage msg = super.createMessage(id); + AMQMessage message = super.createMessage(id); + + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + + //The createMessage above is for a Transient Message so it is safe to have no context. + message.setPublishAndContentHeaderBody(null, info, header); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); props.setPriority(i); - msg.getContentHeaderBody().properties = props; - return msg; + message.getContentHeaderBody().properties = props; + return message; } protected AMQMessage createMessage(Long id) throws AMQException
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Fri Feb 13 11:24:44 2009 @@ -136,6 +136,7 @@ while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH) { sendMessages(1, MAX_MESSAGE_SIZE); + System.err.println(_queue.getQueueDepth() + ":" + MAX_QUEUE_DEPTH); } Notification lastNotification = _queueMBean.getLastNotification(); @@ -307,7 +308,7 @@ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); messages[i].enqueue(qs); - messages[i].routingComplete(_messageStore, new MessageHandleFactory()); + messages[i].routingComplete(_messageStore, new MessageFactory()); } Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Feb 13 11:24:44 2009 @@ -221,7 +221,7 @@ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore, new MessageHandleFactory()); + msg.routingComplete(_messageStore, new MessageFactory()); msg.addContentBodyFrame(new ContentChunk() { @@ -305,7 +305,7 @@ currentMessage.enqueue(qs); // route header - currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); + currentMessage.routingComplete(_messageStore, new MessageFactory()); // Add the body so we have somthing to test later currentMessage.addContentBodyFrame( Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Fri Feb 13 11:24:44 2009 @@ -98,7 +98,7 @@ new LinkedList<RequiredDeliveryException>() ); _queue.registerSubscription(_subscription,false); - MessageHandleFactory factory = new MessageHandleFactory(); + MessageFactory factory = new MessageFactory(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) Copied: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java (from r744076, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java&r1=744076&r2=744079&rev=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java Fri Feb 13 11:24:44 2009 @@ -20,29 +20,29 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.MemoryMessageStore; +import junit.framework.TestCase; -public class WeakMessageHandleTest extends InMemoryMessageHandleTest +public class MessageFactoryTest extends TestCase { - private MemoryMessageStore _messageStore; + private MessageFactory _factory; public void setUp() { - _messageStore = new MemoryMessageStore(); - _messageStore.configure(); + _factory = new MessageFactory(); } - protected AMQMessageHandle newHandle(Long id) + public void testTransientMessageCreation() { - return new WeakReferenceMessageHandle(id, _messageStore); + AMQMessage message = _factory.createMessage(0L, null, false); + + assertEquals("Transient Message creation does not return correct class.", TransientAMQMessage.class, message.getClass()); } - @Override - public void testIsPersistent() + public void testPersistentMessageCreation() { - _handle = newHandle(1L); - assertTrue(_handle.isPersistent()); - } + AMQMessage message = _factory.createMessage(0L, null, true); + assertEquals("Transient Message creation does not return correct class.", PersistentAMQMessage.class, message.getClass()); + } -} +} \ No newline at end of file Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Fri Feb 13 11:24:44 2009 @@ -22,23 +22,13 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -public class MockAMQMessage extends AMQMessage +public class MockAMQMessage extends TransientAMQMessage { public MockAMQMessage(long messageId) throws AMQException { - super(new MockAMQMessageHandle(messageId) , - (StoreContext)null, - (MessagePublishInfo)new MessagePublishInfoImpl()); - } - - protected MockAMQMessage(AMQMessage msg) - throws AMQException - { - super(msg); + super(messageId); } Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Fri Feb 13 11:24:44 2009 @@ -27,19 +27,16 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; import java.util.List; import java.util.Set; -import java.util.Map; -import java.util.HashMap; -import java.util.LinkedList; public class MockAMQQueue implements AMQQueue { private boolean _deleted = false; + private int _queueCount; private AMQShortString _name; public MockAMQQueue(String name) @@ -47,11 +44,6 @@ _name = new AMQShortString(name); } - public MockAMQQueue() - { - - } - public AMQShortString getName() { return _name; @@ -134,7 +126,7 @@ public long getQueueDepth() { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return _queueCount; } public long getReceivedMessageCount() @@ -159,6 +151,7 @@ public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException { + _queueCount++; return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -169,7 +162,7 @@ public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { - //To change body of implemented methods use File | Settings | File Templates. + _queueCount--; } public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java Fri Feb 13 11:24:44 2009 @@ -20,14 +20,32 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.FixedSizeByteBufferAllocator; +import org.apache.qpid.framing.abstraction.ContentChunk; public class MockContentChunk implements ContentChunk { + public static final int DEFAULT_SIZE=0; + private ByteBuffer _bytebuffer; private int _size; + + + public MockContentChunk() + { + this(0); + } + + public MockContentChunk(int size) + { + FixedSizeByteBufferAllocator allocator = new FixedSizeByteBufferAllocator(); + _bytebuffer = allocator.allocate(size, false); + + _size = size; + } + public MockContentChunk(ByteBuffer bytebuffer, int size) { _bytebuffer = bytebuffer; Copied: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java (from r744076, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java&r1=744076&r2=744079&rev=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java Fri Feb 13 11:24:44 2009 @@ -21,8 +21,9 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; -public class WeakMessageHandleTest extends InMemoryMessageHandleTest +public class PersistentMessageTest extends TransientMessageTest { private MemoryMessageStore _messageStore; @@ -30,19 +31,20 @@ { _messageStore = new MemoryMessageStore(); _messageStore.configure(); + _storeContext = new StoreContext(); } - protected AMQMessageHandle newHandle(Long id) + @Override + protected AMQMessage newMessage(Long id) { - return new WeakReferenceMessageHandle(id, _messageStore); + return new MessageFactory().createMessage(id, _messageStore, true); } @Override public void testIsPersistent() { - _handle = newHandle(1L); - assertTrue(_handle.isPersistent()); + _message = newMessage(1L); + assertTrue(_message.isPersistent()); } - } Copied: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (from r744076, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java) URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java&r1=744076&r2=744079&rev=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java Fri Feb 13 11:24:44 2009 @@ -20,28 +20,29 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.store.MemoryMessageStore; +import junit.framework.TestCase; -public class WeakMessageHandleTest extends InMemoryMessageHandleTest +public class QueueEntryImplTest extends TestCase { - private MemoryMessageStore _messageStore; - public void setUp() + /** + * Test the Redelivered state of a QueueEntryImpl + */ + public void testRedelivered() { - _messageStore = new MemoryMessageStore(); - _messageStore.configure(); - } + QueueEntry entry = new QueueEntryImpl(null, null); - protected AMQMessageHandle newHandle(Long id) - { - return new WeakReferenceMessageHandle(id, _messageStore); - } + assertFalse("New message should not be redelivered", entry.isRedelivered()); + + entry.setRedelivered(true); + + assertTrue("New message should not be redelivered", entry.isRedelivered()); + + //Check we can revert it.. not that we ever should. + entry.setRedelivered(false); + + assertFalse("New message should not be redelivered", entry.isRedelivered()); - @Override - public void testIsPersistent() - { - _handle = newHandle(1L); - assertTrue(_handle.isPersistent()); } Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Feb 13 11:24:44 2009 @@ -56,7 +56,8 @@ protected FieldTable _arguments = null; MessagePublishInfo info = new MessagePublishInfoImpl(); - + private static final long MESSAGE_SIZE = 100; + @Override protected void setUp() throws Exception { @@ -317,7 +318,7 @@ // Send persistent message qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_store, new MessageHandleFactory()); + msg.routingComplete(_store, new MessageFactory()); _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); // Check that it is enqueued @@ -326,9 +327,14 @@ // Dequeue message MockQueueEntry entry = new MockQueueEntry(); - AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); + AMQMessage message = new MessageFactory().createMessage(1L, _store, true); - entry.setMessage(amqmsg); + ContentHeaderBody header = new ContentHeaderBody(); + header.bodySize = MESSAGE_SIZE; + // This is a persist message but we are not in a transaction so create a new context for the message + message.setPublishAndContentHeaderBody(new StoreContext(), info, header); + + entry.setMessage(message); _queue.dequeue(null, entry); // Check that it is dequeued @@ -338,22 +344,19 @@ // FIXME: move this to somewhere useful - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) + private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody) { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); + final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId, null, false); try { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); + //Safe to use a null StoreContext as we have created a TransientMessage (see false param above) + amqMessage.setPublishAndContentHeaderBody( null, publishBody, new ContentHeaderBody() + { + public int getSize() + { + return 1; + } + }); } catch (AMQException e) { @@ -361,18 +364,18 @@ } - return amqMessageHandle; + return amqMessage; } - public class TestMessage extends AMQMessage + public class TestMessage extends TransientAMQMessage { private final long _tag; private int _count; - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody) throws AMQException { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); + super(createMessage(messageId, publishBody)); _tag = tag; } @@ -396,7 +399,8 @@ protected AMQMessage createMessage(Long id) throws AMQException { - AMQMessage messageA = new TestMessage(id, id, info, new StoreContext()); + + AMQMessage messageA = new TestMessage(id, id, info); return messageA; } } Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java?rev=744079&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java (added) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java Fri Feb 13 11:24:44 2009 @@ -0,0 +1,467 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderProperties; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.store.StoreContext; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class TransientMessageTest extends TestCase +{ + AMQMessage _message; + StoreContext _storeContext = null; + + protected AMQMessage newMessage(Long id) + { + return new MessageFactory().createMessage(id, null, false); + } + + public void testMessageID() + { + Long id = 1L; + _message = newMessage(id); + + assertEquals("Message not set value", id, _message.getMessageId()); + } + + public void testInvalidContentChunk() + { + _message = newMessage(1L); + + try + { + _message.getContentChunk(0); + fail("getContentChunk should not succeed"); + } + catch (RuntimeException e) + { + assertTrue(e.getMessage().equals("No ContentBody has been set")); + } + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, false); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + try + { + _message.getContentChunk(-1); + fail("getContentChunk should not succeed"); + } + catch (IllegalArgumentException e) + { + assertTrue(e.getMessage().contains("out of valid range")); + } + + try + { + _message.getContentChunk(1); + fail("getContentChunk should not succeed"); + } + catch (IllegalArgumentException e) + { + assertTrue(e.getMessage().contains("out of valid range")); + } + } + + public void testAddSingleContentChunk() + { + + _message = newMessage(1L); + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 1, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0)); + + cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + fail("Exception should prevent adding two final chunks"); + } + catch (UnsupportedOperationException e) + { + //normal path + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + } + + public void testAddMultipleContentChunk() + { + + _message = newMessage(1L); + + ContentChunk cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, false); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 1, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(0)); + + cc = new MockContentChunk(100); + + try + { + _message.addContentBodyFrame(_storeContext, cc, true); + } + catch (AMQException e) + { + fail("AMQException thrown:" + e.getMessage()); + } + + assertEquals("Incorrect body count", 2, _message.getBodyCount()); + + assertEquals("Incorrect ContentChunk returned.", cc, _message.getContentChunk(1)); + + } + + public void testInitialArrivalTime() + { + _message = newMessage(1L); + + assertEquals("Initial Arrival time should be 0L", 0L, _message.getArrivalTime()); + } + + public void testSetPublishAndContentHeaderBody_WithBody() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 100; + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, new BasicContentHeaderProperties(), bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testSetPublishAndContentHeaderBody_Null() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, null); + fail("setPublishAndContentHeaderBody with null ContentHeaederBody did not throw NPE."); + } + catch (NullPointerException npe) + { + assertEquals("HeaderBody cannot be null", npe.getMessage()); + } + catch (AMQException e) + { + fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage()); + } + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, null, chb); + fail("setPublishAndContentHeaderBody with null MessagePublishInfo did not throw NPE."); + } + catch (NullPointerException npe) + { + assertEquals("PublishInfo cannot be null", npe.getMessage()); + } + catch (AMQException e) + { + fail("setPublishAndContentHeaderBody should not throw AMQException here:" + e.getMessage()); + } + } + + public void testSetPublishAndContentHeaderBody_Empty() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertEquals("BodySize not returned correctly. ", bodySize, _message.getSize()); + + ContentHeaderBody retreived_chb = _message.getContentHeaderBody(); + + ContentHeaderProperties chp = retreived_chb.properties; + + assertEquals("ContentHeaderBody not correct", chb, retreived_chb); + + assertEquals("AppID not correctly retreived", "HandleTest", + ((BasicContentHeaderProperties) chp).getAppIdAsString()); + + MessagePublishInfo retreived_mpi = _message.getMessagePublishInfo(); + + assertEquals("MessagePublishInfo not correct", mpi, retreived_mpi); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testIsPersistent() + { + _message = newMessage(1L); + + assertFalse(_message.isPersistent()); + } + + public void testImmediateAndNotDelivered() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered()); + + assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer()); + + _message.setDeliveredToConsumer(); + + assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer()); + + assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testNotImmediateAndNotDelivered() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + try + { + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered()); + + assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer()); + + _message.setDeliveredToConsumer(); + + assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer()); + + assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + public void testExpiry() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + try + { + _message.setExpiration(System.currentTimeMillis() + 10L); + + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("New messages should not be expired.", _message.expired()); + + final long MILLIS =1000000L; + long waitTime = 20 * MILLIS; + + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + assertTrue("After a sleep messages should now be expired.", _message.expired()); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + + public void testNoExpiry() + { + _message = newMessage(1L); + + MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null); + int bodySize = 0; + + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + props.setAppId("HandleTest"); + + ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize); + + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + try + { + + _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb); + + assertFalse("New messages should not be expired.", _message.expired()); + + final long MILLIS =1000000L; + long waitTime = 10 * MILLIS; + + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + assertFalse("After a sleep messages without an expiry should not expire.", _message.expired()); + + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + +} Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Fri Feb 13 11:24:44 2009 @@ -30,7 +30,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -389,7 +389,7 @@ try { - currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory()); + currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageFactory()); } catch (AMQException e) { Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=744079&r1=744078&r2=744079&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Feb 13 11:24:44 2009 @@ -26,9 +26,8 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.AMQMessageHandle; /** * Tests that reference counting works correctly with AMQMessage and the message store @@ -56,10 +55,9 @@ MessagePublishInfo info = new MessagePublishInfoImpl(); final long messageId = _store.getNewMessageId(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext,info); + + AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true); + message.setPublishAndContentHeaderBody(_storeContext, info, chb); message = message.takeReference(); @@ -88,18 +86,10 @@ final Long messageId = _store.getNewMessageId(); final ContentHeaderBody chb = createPersistentContentHeader(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext, - info); - + AMQMessage message = (new MessageFactory()).createMessage(messageId, _store, true); + message.setPublishAndContentHeaderBody(_storeContext, info, chb); message = message.takeReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - assertEquals(1, _store.getMessageMetaDataMap().size()); message = message.takeReference(); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org