Author: aconway Date: Sat Nov 22 02:54:59 2014 New Revision: 1641027 URL: http://svn.apache.org/r1641027 Log: NO-JIRA: Fix qpid-cpp-benchmark to work with activemq broker.
The activemq broker appears not to respect the AMQP drain flag, so calling fetch() on an empty queue hangs. Use get() with a timeout instead to drain the queues. Modified: qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Modified: qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i?rev=1641027&r1=1641026&r2=1641027&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i (original) +++ qpid/trunk/qpid/cpp/bindings/qpid/python/qpid_messaging.i Sat Nov 22 02:54:59 2014 @@ -151,6 +151,7 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi %rename(_next_receiver) qpid::messaging::Session::nextReceiver; %rename(_fetch) qpid::messaging::Receiver::fetch; +%rename(_get) qpid::messaging::Receiver::get; %rename(unsettled) qpid::messaging::Receiver::getUnsettled; %rename(available) qpid::messaging::Receiver::getAvailable; @@ -346,6 +347,16 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi # but C++ API uses milliseconds return self._fetch(Duration(int(1000*timeout))) %} + + %pythoncode %{ + def get(self, timeout=None) : + if timeout is None : + return self._get() + else : + # Python API uses timeouts in seconds, + # but C++ API uses milliseconds + return self._get(Duration(int(1000*timeout))) + %} } %extend qpid::messaging::Sender { Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1641027&r1=1641026&r2=1641027&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original) +++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Sat Nov 22 02:54:59 2014 @@ -78,6 +78,8 @@ op.add_option("--connection-options", ty help="Connection options for senders & receivers") op.add_option("--durable", default=False, action="store_true", help="Use durable queues and messages") +op.add_option("-t", "--timeout", default=1.0, type="float", metavar="SECONDS", + help="Timeout for fetch operations (default %default)") op.add_option("--save-received", default=False, action="store_true", help="Save received message content to files <queuename>-receiver-<n>.msg") op.add_option("--verbose", default=False, action="store_true", @@ -195,7 +197,11 @@ def drain(queue, session, opts): n = 0 try: while True: - r.fetch(timeout=0) + # FIXME aconway 2014-11-21: activemq broker does not respect the drain flag + # so fetch on an empty queue will hang forever, use get with timeout instead. + # r.fetch(timeout=0) + m = qm.Message() + r.get(timeout=opts.timeout) n += 1 if n % 500 == 0: r.session.acknowledge() r.session.acknowledge() @@ -271,7 +277,7 @@ class ReadyReceiver: self.connection = connect(broker, opts) self.receiver = self.connection.session().receiver(queue) self.receiver.session.sync() - self.timeout=10 + self.timeout=opts.timeout def wait(self, receivers): try: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org