Repository: qpid-jms
Updated Branches:
  refs/heads/master 43c8be481 -> 6d0fcad96


Add message queue tests

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6d0fcad9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6d0fcad9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6d0fcad9

Branch: refs/heads/master
Commit: 6d0fcad9641e2818691ccfba663fe893a5ceff47
Parents: 43c8be4
Author: Timothy Bish <tabish...@gmail.com>
Authored: Mon Oct 6 18:35:26 2014 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Mon Oct 6 18:35:26 2014 -0400

----------------------------------------------------------------------
 .../qpid/jms/util/FifoMessageQueueTest.java     | 312 +++++++++++++++++++
 .../qpid/jms/util/PriorityMessageQueueTest.java | 306 ++++++++++++++++++
 2 files changed, 618 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d0fcad9/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
new file mode 100644
index 0000000..4bc7908
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMessageFacade;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the FIFO based message queue.
+ */
+public class FifoMessageQueueTest {
+
+    private MessageQueue queue;
+    private final IdGenerator messageId = new IdGenerator();
+    private long sequence;
+
+    @Before
+    public void setUp() {
+        queue = new FifoMessageQueue();
+        queue.start();
+    }
+
+    @Test
+    public void testToString() {
+        assertNotNull(queue.toString());
+    }
+
+    @Test
+    public void testGetLock() {
+        assertNotNull(queue.getLock());
+    }
+
+    @Test
+    public void testCreate() {
+        FifoMessageQueue queue = new FifoMessageQueue();
+
+        assertFalse(queue.isClosed());
+        assertTrue(queue.isEmpty());
+        assertFalse(queue.isRunning());
+
+        assertEquals(0, queue.size());
+    }
+
+    @Test
+    public void testClose() {
+        assertFalse(queue.isClosed());
+        assertTrue(queue.isRunning());
+        queue.close();
+        assertTrue(queue.isClosed());
+        assertFalse(queue.isRunning());
+        queue.close();
+    }
+
+    @Test
+    public void testDequeueNoWaitWhenQueueIsClosed() {
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueueFirst(message);
+
+        assertFalse(queue.isEmpty());
+        queue.close();
+        assertSame(null, queue.dequeueNoWait());
+    }
+
+    @Test
+    public void testDequeueWhenQueueIsClosed() throws InterruptedException {
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueueFirst(message);
+
+        assertFalse(queue.isEmpty());
+        queue.close();
+        assertSame(null, queue.dequeue(1L));
+    }
+
+    @Test
+    public void testDequeueWhenQueueIsStopped() throws InterruptedException {
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueueFirst(message);
+
+        assertFalse(queue.isEmpty());
+        queue.stop();
+        assertFalse(queue.isRunning());
+        assertSame(null, queue.dequeue(1L));
+        queue.start();
+        assertTrue(queue.isRunning());
+        assertSame(message, queue.dequeue(1L));
+    }
+
+    @Test
+    public void testDequeueNoWaitWhenQueueIsStopped() {
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueueFirst(message);
+
+        assertFalse(queue.isEmpty());
+        queue.stop();
+        assertFalse(queue.isRunning());
+        assertSame(null, queue.dequeueNoWait());
+        queue.start();
+        assertTrue(queue.isRunning());
+        assertSame(message, queue.dequeueNoWait());
+    }
+
+    @Test
+    public void testEnqueueFirst() {
+        JmsInboundMessageDispatch message1 = createEnvelope();
+        JmsInboundMessageDispatch message2 = createEnvelope();
+        JmsInboundMessageDispatch message3 = createEnvelope();
+
+        queue.enqueueFirst(message1);
+        queue.enqueueFirst(message2);
+        queue.enqueueFirst(message3);
+
+        assertSame(message3, queue.dequeueNoWait());
+        assertSame(message2, queue.dequeueNoWait());
+        assertSame(message1, queue.dequeueNoWait());
+    }
+
+    @Test
+    public void testClear() {
+        List<JmsInboundMessageDispatch> messages = 
createFullRangePrioritySet();
+
+        for (JmsInboundMessageDispatch envelope: messages) {
+            queue.enqueue(envelope);
+        }
+
+        assertFalse(queue.isEmpty());
+        queue.clear();
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testRemoveAll() throws JMSException {
+        List<JmsInboundMessageDispatch> messages = 
createFullRangePrioritySet();
+        Collections.shuffle(messages);
+
+        for (JmsInboundMessageDispatch envelope: messages) {
+            queue.enqueue(envelope);
+        }
+
+        assertFalse(queue.isEmpty());
+        List<JmsInboundMessageDispatch> result = queue.removeAll();
+        assertTrue(queue.isEmpty());
+
+        assertEquals(10, result.size());
+
+        for (byte i = 0; i < 10; ++i) {
+            assertEquals(result.get(i), messages.get(i));
+        }
+    }
+
+    @Test
+    public void testRemoveFirstOnEmptyQueue() {
+        assertNull(queue.dequeueNoWait());
+    }
+
+    @Test
+    public void testRemoveFirst() throws JMSException {
+        List<JmsInboundMessageDispatch> messages = 
createFullRangePrioritySet();
+        Collections.shuffle(messages);
+
+        for (JmsInboundMessageDispatch envelope: messages) {
+            queue.enqueue(envelope);
+        }
+
+        for (byte i = 0; i < 10; ++i) {
+            JmsInboundMessageDispatch first = queue.dequeueNoWait();
+            assertEquals(first, messages.get(i));
+        }
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testRemoveFirstSparse() throws JMSException {
+        queue.enqueue(createEnvelope(9));
+        queue.enqueue(createEnvelope(4));
+        queue.enqueue(createEnvelope(1));
+
+        JmsInboundMessageDispatch envelope = queue.dequeueNoWait();
+        assertEquals(9, envelope.getMessage().getJMSPriority());
+        envelope = queue.dequeueNoWait();
+        assertEquals(4, envelope.getMessage().getJMSPriority());
+        envelope = queue.dequeueNoWait();
+        assertEquals(1, envelope.getMessage().getJMSPriority());
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testPeekOnEmptyQueue() {
+        assertNull(queue.peek());
+    }
+
+    @Test
+    public void testPeekFirst() throws JMSException {
+        List<JmsInboundMessageDispatch> messages = 
createFullRangePrioritySet();
+        Collections.shuffle(messages);
+
+        for (JmsInboundMessageDispatch envelope: messages) {
+            queue.enqueue(envelope);
+        }
+
+        for (byte i = 0; i < 10; ++i) {
+            JmsInboundMessageDispatch first = queue.peek();
+            assertEquals(first, messages.get(i));
+            queue.dequeueNoWait();
+        }
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testPeekFirstSparse() throws JMSException {
+        queue.enqueue(createEnvelope(9));
+        queue.enqueue(createEnvelope(4));
+        queue.enqueue(createEnvelope(1));
+
+        JmsInboundMessageDispatch envelope = queue.peek();
+        assertEquals(9, envelope.getMessage().getJMSPriority());
+        queue.dequeueNoWait();
+        envelope = queue.peek();
+        assertEquals(4, envelope.getMessage().getJMSPriority());
+        queue.dequeueNoWait();
+        envelope = queue.peek();
+        assertEquals(1, envelope.getMessage().getJMSPriority());
+        queue.dequeueNoWait();
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testDequeueWaitsUntilMessageArrives() throws 
InterruptedException {
+        final JmsInboundMessageDispatch message = createEnvelope();
+        Thread runner = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(2);
+                } catch (InterruptedException e) {
+                }
+                queue.enqueueFirst(message);
+            }
+        });
+        runner.start();
+
+        assertSame(message, queue.dequeue(-1));
+    }
+
+    private List<JmsInboundMessageDispatch> createFullRangePrioritySet() {
+        List<JmsInboundMessageDispatch> messages = new 
ArrayList<JmsInboundMessageDispatch>();
+        for (int i = 0; i < 10; ++i) {
+            messages.add(createEnvelope(i));
+        }
+        return messages;
+    }
+
+    private JmsInboundMessageDispatch createEnvelope() {
+        JmsInboundMessageDispatch envelope = new 
JmsInboundMessageDispatch(sequence++);
+        envelope.setMessage(createMessage());
+        return envelope;
+    }
+
+    private JmsInboundMessageDispatch createEnvelope(int priority) {
+        JmsInboundMessageDispatch envelope = new 
JmsInboundMessageDispatch(sequence++);
+        envelope.setMessage(createMessage(priority));
+        return envelope;
+    }
+
+    private JmsMessage createMessage() {
+        return createMessage(4);
+    }
+
+    private JmsMessage createMessage(int priority) {
+        JmsDefaultMessageFacade facade = new JmsDefaultMessageFacade();
+        facade.setMessageId(messageId.generateId());
+        facade.setPriority((byte) priority);
+        JmsMessage message = new JmsMessage(facade);
+
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d0fcad9/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java
new file mode 100644
index 0000000..ee0b4b9
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMessageFacade;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for the priority based message Queue
+ */
+public class PriorityMessageQueueTest {
+
+    private MessageQueue queue;
+    private final IdGenerator messageId = new IdGenerator();
+    private long sequence;
+
+    @Before
+    public void setUp() {
+        queue = new PriorityMessageQueue();
+        queue.start();
+    }
+
+    @Test
+    public void testGetLock() {
+        assertNotNull(queue.getLock());
+    }
+
+    @Test
+    public void testCreate() {
+        PriorityMessageQueue queue = new PriorityMessageQueue();
+
+        assertFalse(queue.isClosed());
+        assertTrue(queue.isEmpty());
+        assertFalse(queue.isRunning());
+
+        assertEquals(0, queue.size());
+    }
+
+    @Test
+    public void testClose() {
+        assertFalse(queue.isClosed());
+        assertTrue(queue.isRunning());
+        queue.close();
+        assertTrue(queue.isClosed());
+        assertFalse(queue.isRunning());
+        queue.close();
+    }
+
+    @Test
+    public void testDequeueNoWaitWhenQueueIsClosed() {
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueueFirst(message);
+
+        assertFalse(queue.isEmpty());
+        queue.close();
+        assertSame(null, queue.dequeueNoWait());
+    }
+
+    @Test
+    public void testDequeueWhenQueueIsClosed() throws InterruptedException {
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueueFirst(message);
+
+        assertFalse(queue.isEmpty());
+        queue.close();
+        assertSame(null, queue.dequeue(1L));
+    }
+
+    @Test
+    public void testDequeueWhenQueueIsStopped() throws InterruptedException {
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueueFirst(message);
+
+        assertFalse(queue.isEmpty());
+        queue.stop();
+        assertFalse(queue.isRunning());
+        assertSame(null, queue.dequeue(1L));
+        queue.start();
+        assertTrue(queue.isRunning());
+        assertSame(message, queue.dequeue(1L));
+    }
+
+    @Test
+    public void testDequeueNoWaitWhenQueueIsStopped() {
+        JmsInboundMessageDispatch message = createEnvelope();
+        queue.enqueueFirst(message);
+
+        assertFalse(queue.isEmpty());
+        queue.stop();
+        assertFalse(queue.isRunning());
+        assertSame(null, queue.dequeueNoWait());
+        queue.start();
+        assertTrue(queue.isRunning());
+        assertSame(message, queue.dequeueNoWait());
+    }
+
+    @Test
+    public void testEnqueueFirst() {
+        JmsInboundMessageDispatch message1 = createEnvelope();
+        JmsInboundMessageDispatch message2 = createEnvelope();
+        JmsInboundMessageDispatch message3 = createEnvelope();
+
+        queue.enqueueFirst(message1);
+        queue.enqueueFirst(message2);
+        queue.enqueueFirst(message3);
+
+        assertSame(message3, queue.dequeueNoWait());
+        assertSame(message2, queue.dequeueNoWait());
+        assertSame(message1, queue.dequeueNoWait());
+    }
+
+    @Test
+    public void testClear() {
+        List<JmsInboundMessageDispatch> messages = 
createFullRangePrioritySet();
+
+        for (JmsInboundMessageDispatch envelope: messages) {
+            queue.enqueue(envelope);
+        }
+
+        assertFalse(queue.isEmpty());
+        queue.clear();
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testRemoveAll() throws JMSException {
+        List<JmsInboundMessageDispatch> messages = 
createFullRangePrioritySet();
+        Collections.shuffle(messages);
+
+        for (JmsInboundMessageDispatch envelope: messages) {
+            queue.enqueue(envelope);
+        }
+
+        assertFalse(queue.isEmpty());
+        List<JmsInboundMessageDispatch> result = queue.removeAll();
+        assertTrue(queue.isEmpty());
+
+        assertEquals(10, result.size());
+
+        for (byte i = 9; i >= 0; --i) {
+            JmsInboundMessageDispatch envelope = result.remove(0);
+            assertEquals(i, envelope.getMessage().getJMSPriority());
+        }
+    }
+
+    @Test
+    public void testRemoveFirstOnEmptyQueue() {
+        assertNull(queue.dequeueNoWait());
+    }
+
+    @Test
+    public void testRemoveFirst() throws JMSException {
+        List<JmsInboundMessageDispatch> messages = 
createFullRangePrioritySet();
+
+        for (JmsInboundMessageDispatch envelope: messages) {
+            queue.enqueue(envelope);
+        }
+
+        for (byte i = 9; i >= 0; --i) {
+            JmsInboundMessageDispatch first = queue.dequeueNoWait();
+            assertEquals(i, first.getMessage().getJMSPriority());
+        }
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testRemoveFirstSparse() throws JMSException {
+        queue.enqueue(createEnvelope(9));
+        queue.enqueue(createEnvelope(4));
+        queue.enqueue(createEnvelope(1));
+
+        JmsInboundMessageDispatch envelope = queue.dequeueNoWait();
+        assertEquals(9, envelope.getMessage().getJMSPriority());
+        envelope = queue.dequeueNoWait();
+        assertEquals(4, envelope.getMessage().getJMSPriority());
+        envelope = queue.dequeueNoWait();
+        assertEquals(1, envelope.getMessage().getJMSPriority());
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testPeekOnEmptyQueue() {
+        assertNull(queue.peek());
+    }
+
+    @Test
+    public void testPeekFirst() throws JMSException {
+        List<JmsInboundMessageDispatch> messages = 
createFullRangePrioritySet();
+
+        for (JmsInboundMessageDispatch envelope: messages) {
+            queue.enqueue(envelope);
+        }
+
+        for (byte i = 9; i >= 0; --i) {
+            JmsInboundMessageDispatch first = queue.peek();
+            assertEquals(i, first.getMessage().getJMSPriority());
+            queue.dequeueNoWait();
+        }
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testPeekFirstSparse() throws JMSException {
+        queue.enqueue(createEnvelope(9));
+        queue.enqueue(createEnvelope(4));
+        queue.enqueue(createEnvelope(1));
+
+        JmsInboundMessageDispatch envelope = queue.peek();
+        assertEquals(9, envelope.getMessage().getJMSPriority());
+        queue.dequeueNoWait();
+        envelope = queue.peek();
+        assertEquals(4, envelope.getMessage().getJMSPriority());
+        queue.dequeueNoWait();
+        envelope = queue.peek();
+        assertEquals(1, envelope.getMessage().getJMSPriority());
+        queue.dequeueNoWait();
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
+    public void testDequeueWaitsUntilMessageArrives() throws 
InterruptedException {
+        final JmsInboundMessageDispatch message = createEnvelope();
+        Thread runner = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(2);
+                } catch (InterruptedException e) {
+                }
+                queue.enqueueFirst(message);
+            }
+        });
+        runner.start();
+
+        assertSame(message, queue.dequeue(-1));
+    }
+
+    private List<JmsInboundMessageDispatch> createFullRangePrioritySet() {
+        List<JmsInboundMessageDispatch> messages = new 
ArrayList<JmsInboundMessageDispatch>();
+        for (int i = 0; i < 10; ++i) {
+            messages.add(createEnvelope(i));
+        }
+        return messages;
+    }
+
+    private JmsInboundMessageDispatch createEnvelope() {
+        JmsInboundMessageDispatch envelope = new 
JmsInboundMessageDispatch(sequence++);
+        envelope.setMessage(createMessage());
+        return envelope;
+    }
+
+    private JmsInboundMessageDispatch createEnvelope(int priority) {
+        JmsInboundMessageDispatch envelope = new 
JmsInboundMessageDispatch(sequence++);
+        envelope.setMessage(createMessage(priority));
+        return envelope;
+    }
+
+    private JmsMessage createMessage() {
+        return createMessage(4);
+    }
+
+    private JmsMessage createMessage(int priority) {
+        JmsDefaultMessageFacade facade = new JmsDefaultMessageFacade();
+        facade.setMessageId(messageId.generateId());
+        facade.setPriority((byte) priority);
+        JmsMessage message = new JmsMessage(facade);
+
+        return message;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to