Modified: qpid/branches/java-network-refactor/qpid/python/tests_0-9/queue.py URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/tests_0-9/queue.py?rev=812936&r1=812935&r2=812936&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/python/tests_0-9/queue.py (original) +++ qpid/branches/java-network-refactor/qpid/python/tests_0-9/queue.py Wed Sep 9 13:05:43 2009 @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,11 +19,137 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import TestBase +from qpid.testlib import testrunner, TestBase class QueueTests(TestBase): """Tests for 'methods' on the amqp queue 'class'""" + def test_purge(self): + """ + Test that the purge method removes messages from the queue + """ + channel = self.channel + #setup, declare a queue and add some messages to it: + channel.exchange_declare(exchange="test-exchange", type="direct") + channel.queue_declare(queue="test-queue", exclusive=True) + channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") + channel.message_transfer(destination="test-exchange", routing_key="key", body="one") + channel.message_transfer(destination="test-exchange", routing_key="key", body="two") + channel.message_transfer(destination="test-exchange", routing_key="key", body="three") + + #check that the queue now reports 3 messages: + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(3, reply.message_count) + + #now do the purge, then test that three messages are purged and the count drops to 0 + reply = channel.queue_purge(queue="test-queue"); + self.assertEqual(3, reply.message_count) + reply = channel.queue_declare(queue="test-queue") + self.assertEqual(0, reply.message_count) + + #send a further message and consume it, ensuring that the other messages are really gone + channel.message_transfer(destination="test-exchange", routing_key="key", body="four") + channel.message_consume(queue="test-queue", destination="tag", no_ack=True) + queue = self.client.queue("tag") + msg = queue.get(timeout=1) + self.assertEqual("four", msg.body) + + #check error conditions (use new channels): + channel = self.client.channel(2) + channel.channel_open() + try: + #queue specified but doesn't exist: + channel.queue_purge(queue="invalid-queue") + self.fail("Expected failure when purging non-existent queue") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + channel = self.client.channel(3) + channel.channel_open() + try: + #queue not specified and none previously declared for channel: + channel.queue_purge() + self.fail("Expected failure when purging unspecified queue") + except Closed, e: + self.assertConnectionException(530, e.args[0]) + + #cleanup + other = self.connect() + channel = other.channel(1) + channel.channel_open() + channel.exchange_delete(exchange="test-exchange") + + def test_declare_exclusive(self): + """ + Test that the exclusive field is honoured in queue.declare + """ + # TestBase.setUp has already opened channel(1) + c1 = self.channel + # Here we open a second separate connection: + other = self.connect() + c2 = other.channel(1) + c2.channel_open() + + #declare an exclusive queue: + c1.queue_declare(queue="exclusive-queue", exclusive="True") + try: + #other connection should not be allowed to declare this: + c2.queue_declare(queue="exclusive-queue", exclusive="True") + self.fail("Expected second exclusive queue_declare to raise a channel exception") + except Closed, e: + self.assertChannelException(405, e.args[0]) + + + def test_declare_passive(self): + """ + Test that the passive field is honoured in queue.declare + """ + channel = self.channel + #declare an exclusive queue: + channel.queue_declare(queue="passive-queue-1", exclusive="True") + channel.queue_declare(queue="passive-queue-1", passive="True") + try: + #other connection should not be allowed to declare this: + channel.queue_declare(queue="passive-queue-2", passive="True") + self.fail("Expected passive declaration of non-existant queue to raise a channel exception") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + def test_bind(self): + """ + Test various permutations of the queue.bind method + """ + channel = self.channel + channel.queue_declare(queue="queue-1", exclusive="True") + + #straightforward case, both exchange & queue exist so no errors expected: + channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + + #bind the default queue for the channel (i.e. last one declared): + channel.queue_bind(exchange="amq.direct", routing_key="key2") + + #use the queue name where neither routing key nor queue are specified: + channel.queue_bind(exchange="amq.direct") + + #try and bind to non-existant exchange + try: + channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + self.fail("Expected bind to non-existant exchange to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #need to reopen a channel: + channel = self.client.channel(2) + channel.channel_open() + + #try and bind non-existant queue: + try: + channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + self.fail("Expected bind of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + def test_unbind_direct(self): self.unbind_test(exchange="amq.direct", routing_key="key") @@ -39,12 +165,12 @@ def unbind_test(self, exchange, routing_key="", args=None, headers={}): #bind two queues and consume from them channel = self.channel - + channel.queue_declare(queue="queue-1", exclusive="True") channel.queue_declare(queue="queue-2", exclusive="True") - channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True) - channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True) + channel.message_consume(queue="queue-1", destination="queue-1", no_ack=True) + channel.message_consume(queue="queue-2", destination="queue-2", no_ack=True) queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") @@ -53,29 +179,130 @@ channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings - channel.basic_publish(exchange=exchange, routing_key=routing_key, - content=Content("one", properties={"headers": headers})) - + channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="one") + #unbind first queue channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - + #send another message - channel.basic_publish(exchange=exchange, routing_key=routing_key, - content=Content("two", properties={"headers": headers})) + channel.message_transfer(destination=exchange, routing_key=routing_key, application_headers=headers, body="two") #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).content.body) + self.assertEquals("one", queue1.get(timeout=1).body) try: msg = queue1.get(timeout=1) self.fail("Got extra message: %s" % msg.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).content.body) - self.assertEquals("two", queue2.get(timeout=1).content.body) + self.assertEquals("one", queue2.get(timeout=1).body) + self.assertEquals("two", queue2.get(timeout=1).body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) - except Empty: pass + except Empty: pass + + + def test_delete_simple(self): + """ + Test core queue deletion behaviour + """ + channel = self.channel + + #straight-forward case: + channel.queue_declare(queue="delete-me") + channel.message_transfer(routing_key="delete-me", body="a") + channel.message_transfer(routing_key="delete-me", body="b") + channel.message_transfer(routing_key="delete-me", body="c") + reply = channel.queue_delete(queue="delete-me") + self.assertEqual(3, reply.message_count) + #check that it has gone be declaring passively + try: + channel.queue_declare(queue="delete-me", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + #check attempted deletion of non-existant queue is handled correctly: + channel = self.client.channel(2) + channel.channel_open() + try: + channel.queue_delete(queue="i-dont-exist", if_empty="True") + self.fail("Expected delete of non-existant queue to fail") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + + + def test_delete_ifempty(self): + """ + Test that if_empty field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and add a message to it (use default binding): + channel.queue_declare(queue="delete-me-2") + channel.queue_declare(queue="delete-me-2", passive="True") + channel.message_transfer(routing_key="delete-me-2", body="message") + + #try to delete, but only if empty: + try: + channel.queue_delete(queue="delete-me-2", if_empty="True") + self.fail("Expected delete if_empty to fail for non-empty queue") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + #need new channel now: + channel = self.client.channel(2) + channel.channel_open() + + #empty queue: + channel.message_consume(destination="consumer_tag", queue="delete-me-2", no_ack=True) + queue = self.client.queue("consumer_tag") + msg = queue.get(timeout=1) + self.assertEqual("message", msg.body) + channel.message_cancel(destination="consumer_tag") + + #retry deletion on empty queue: + channel.queue_delete(queue="delete-me-2", if_empty="True") + + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-2", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + + def test_delete_ifunused(self): + """ + Test that if_unused field of queue_delete is honoured + """ + channel = self.channel + + #create a queue and register a consumer: + channel.queue_declare(queue="delete-me-3") + channel.queue_declare(queue="delete-me-3", passive="True") + channel.message_consume(destination="consumer_tag", queue="delete-me-3", no_ack=True) + + #need new channel now: + channel2 = self.client.channel(2) + channel2.channel_open() + #try to delete, but only if empty: + try: + channel2.queue_delete(queue="delete-me-3", if_unused="True") + self.fail("Expected delete if_unused to fail for queue with existing consumer") + except Closed, e: + self.assertChannelException(406, e.args[0]) + + + channel.message_cancel(destination="consumer_tag") + channel.queue_delete(queue="delete-me-3", if_unused="True") + #check that it has gone by declaring passively: + try: + channel.queue_declare(queue="delete-me-3", passive="True") + self.fail("Queue has not been deleted") + except Closed, e: + self.assertChannelException(404, e.args[0]) + def test_autodelete_shared(self): """ @@ -109,3 +336,5 @@ self.fail("Expected queue to have been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) + +
Propchange: qpid/branches/java-network-refactor/qpid/python/tests_0-9/queue.py ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Sep 9 13:05:43 2009 @@ -0,0 +1 @@ +/incubator/qpid/trunk/qpid/python/tests_0-9/queue.py:520691-726139 Propchange: qpid/branches/java-network-refactor/qpid/ruby/ext/sasl/extconf.rb ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Sep 9 13:05:43 2009 @@ -0,0 +1 @@ +/incubator/qpid/trunk/qpid/ruby/ext/sasl/extconf.rb:520691-726139 Modified: qpid/branches/java-network-refactor/qpid/specs/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/specs/management-schema.xml?rev=812936&r1=812935&r2=812936&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/specs/management-schema.xml (original) +++ qpid/branches/java-network-refactor/qpid/specs/management-schema.xml Wed Sep 9 13:05:43 2009 @@ -164,11 +164,13 @@ =============================================================== --> <class name="Exchange"> - <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> - <property name="name" type="sstr" access="RC" index="y"/> - <property name="type" type="sstr" access="RO"/> - <property name="durable" type="bool" access="RC"/> - <property name="arguments" type="map" access="RO" desc="Arguments supplied in exchange.declare"/> + <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="type" type="sstr" access="RO"/> + <property name="durable" type="bool" access="RO"/> + <property name="autoDelete" type="bool" access="RO"/> + <property name="altExchange" type="objId" access="RO" optional="y"/> + <property name="arguments" type="map" access="RO" desc="Arguments supplied in exchange.declare"/> <statistic name="producerCount" type="hilo32" desc="Current producers on exchange"/> <statistic name="bindingCount" type="hilo32" desc="Current bindings"/> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org