Author: rhs Date: Tue Jun 2 14:24:57 2009 New Revision: 781044 URL: http://svn.apache.org/viewvc?rev=781044&view=rev Log: first commit of new messaging API and test harness
Added: qpid/trunk/qpid/python/qpid-python-test (with props) qpid/trunk/qpid/python/qpid/debug.py qpid/trunk/qpid/python/qpid/messaging.py qpid/trunk/qpid/python/qpid/tests/ qpid/trunk/qpid/python/qpid/tests/__init__.py qpid/trunk/qpid/python/qpid/tests/messaging.py Modified: qpid/trunk/qpid/python/qpid/testlib.py Added: qpid/trunk/qpid/python/qpid-python-test URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid-python-test?rev=781044&view=auto ============================================================================== --- qpid/trunk/qpid/python/qpid-python-test (added) +++ qpid/trunk/qpid/python/qpid-python-test Tue Jun 2 14:24:57 2009 @@ -0,0 +1,450 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, os, sys, logging, traceback, types +from fnmatch import fnmatchcase as match +from getopt import GetoptError +from logging import getLogger, StreamHandler, Formatter, Filter, \ + WARN, DEBUG, ERROR +from qpid.util import URL + +levels = { + "DEBUG": DEBUG, + "WARN": WARN, + "ERROR": ERROR + } + +sorted_levels = [(v, k) for k, v in levels.items()] +sorted_levels.sort() +sorted_levels = [v for k, v in sorted_levels] + +parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...", + description="Run tests matching the specified PATTERNs.") +parser.add_option("-l", "--list", action="store_true", default=False, + help="list tests instead of executing them") +parser.add_option("-b", "--broker", default="localhost", + help="run tests against BROKER (default %default)") +parser.add_option("-f", "--log-file", metavar="FILE", help="log output to FILE") +parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN", + help="only display log messages of LEVEL or higher severity: " + "%s (default %%default)" % ", ".join(sorted_levels)) +parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append", + dest="log_categories", default=[], + help="log only categories matching CATEGORY pattern") +parser.add_option("-i", "--ignore", action="append", default=[], + help="ignore tests matching IGNORE pattern") +parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append", + default=[], + help="ignore tests matching patterns in IFILE") + +class Config: + + def __init__(self): + self.broker = URL("localhost") + self.work = None + self.log_file = None + self.log_level = WARN + self.log_categories = [] + +opts, args = parser.parse_args() + +includes = [] +excludes = ["*__*__"] +config = Config() +list_only = opts.list +config.broker = URL(opts.broker) +config.log_file = opts.log_file +config.log_level = levels[opts.log_level.upper()] +config.log_categories = opts.log_categories +excludes.extend([v.strip() for v in opts.ignore]) +for v in opts.ignore_file: + f = open(v) + for line in f: + line = line.strip() + if line.startswith("#"): + continue + excludes.append(line) + f.close() + +for a in args: + includes.append(a.strip()) + +if not includes: + includes.append("*") + +def included(path): + for p in excludes: + if match(path, p): + return False + for p in includes: + if match(path, p): + return True + return False + +def vt100_attrs(*attrs): + return "\x1B[%sm" % ";".join(map(str, attrs)) + +vt100_reset = vt100_attrs(0) + +KEYWORDS = {"pass": (32,), + "fail": (31,), + "start": (34,), + "total": (34,)} + +COLORIZE = sys.stdout.isatty() + +def colorize_word(word, text=None): + if text is None: + text = word + return colorize(text, *KEYWORDS.get(word, ())) + +def colorize(text, *attrs): + if attrs and COLORIZE: + return "%s%s%s" % (vt100_attrs(*attrs), text, vt100_reset) + else: + return text + +def indent(text): + lines = text.split("\n") + return " %s" % "\n ".join(lines) + +from qpid.testlib import testrunner +testrunner.setBroker(str(config.broker)) + +class Interceptor: + + def __init__(self): + self.newline = False + self.indent = False + self.passthrough = True + self.dirty = False + self.last = None + + def begin(self): + self.newline = True + self.indent = True + self.passthrough = False + self.dirty = False + self.last = None + + def reset(self): + self.newline = False + self.indent = False + self.passthrough = True + +class StreamWrapper: + + def __init__(self, interceptor, stream, prefix=" "): + self.interceptor = interceptor + self.stream = stream + self.prefix = prefix + + def isatty(self): + return self.stream.isatty() + + def write(self, s): + if self.interceptor.passthrough: + self.stream.write(s) + return + + if s: + self.interceptor.dirty = True + + if self.interceptor.newline: + self.interceptor.newline = False + self.stream.write(" %s\n" % colorize_word("start")) + self.interceptor.indent = True + if self.interceptor.indent: + self.stream.write(self.prefix) + if s.endswith("\n"): + s = s.replace("\n", "\n%s" % self.prefix)[:-2] + self.interceptor.indent = True + else: + s = s.replace("\n", "\n%s" % self.prefix) + self.interceptor.indent = False + self.stream.write(s) + + if s: + self.interceptor.last = s[-1] + + def flush(self): + self.stream.flush() + +interceptor = Interceptor() + +out_wrp = StreamWrapper(interceptor, sys.stdout) +err_wrp = StreamWrapper(interceptor, sys.stderr) + +out = sys.stdout +err = sys.stderr +sys.stdout = out_wrp +sys.stderr = err_wrp + +class PatternFilter(Filter): + + def __init__(self, *patterns): + Filter.__init__(self, patterns) + self.patterns = patterns + + def filter(self, record): + if not self.patterns: + return True + for p in self.patterns: + if match(record.name, p): + return True + return False + +root = getLogger() +handler = StreamHandler(sys.stdout) +filter = PatternFilter(*config.log_categories) +handler.addFilter(filter) +handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s")) +root.addHandler(handler) +root.setLevel(WARN) + +log = getLogger("qpid.test") + +class Runner: + + def __init__(self): + self.exceptions = [] + + def passed(self): + return not self.exceptions + + def failed(self): + return self.exceptions + + def run(self, name, phase): + try: + phase() + except KeyboardInterrupt: + raise + except: + self.exceptions.append((name, sys.exc_info())) + + def status(self): + if self.passed(): + return "pass" + else: + return "fail" + + def print_exceptions(self): + for name, info in self.exceptions: + print "Error during %s:" % name + print indent("".join(traceback.format_exception(*info))).rstrip() + +def run_test(name, test, config): + patterns = filter.patterns + level = root.level + filter.patterns = config.log_categories + root.setLevel(config.log_level) + + parts = name.split(".") + line = None + output = "" + for part in parts: + if line: + if len(line) + len(part) >= 71: + output += "%s. \\\n" % line + line = " %s" % part + else: + line = "%s.%s" % (line, part) + else: + line = part + + if line: + output += "%s %s" % (line, ((72 - len(line))*".")) + sys.stdout.write(output) + sys.stdout.flush() + interceptor.begin() + try: + runner = test() + finally: + interceptor.reset() + if interceptor.dirty: + if interceptor.last != "\n": + sys.stdout.write("\n") + sys.stdout.write(output) + print " %s" % colorize_word(runner.status()) + if runner.failed(): + runner.print_exceptions() + root.setLevel(level) + filter.patterns = patterns + return runner.passed() + +class FunctionTest: + + def __init__(self, test): + self.test = test + + def name(self): + return "%s.%s" % (self.test.__module__, self.test.__name__) + + def run(self): + return run_test(self.name(), self._run, config) + + def _run(self): + runner = Runner() + runner.run("test", lambda: self.test(config)) + return runner + + def __repr__(self): + return "FunctionTest(%r)" % self.test + +class MethodTest: + + def __init__(self, cls, method): + self.cls = cls + self.method = method + + def name(self): + return "%s.%s.%s" % (self.cls.__module__, self.cls.__name__, self.method) + + def run(self): + return run_test(self.name(), self._run, config) + + def _run(self): + runner = Runner() + inst = self.cls(self.method) + test = getattr(inst, self.method) + + if hasattr(inst, "configure"): + runner.run("configure", lambda: inst.configure(config)) + if runner.failed(): return runner + if hasattr(inst, "setUp"): + runner.run("setup", inst.setUp) + if runner.failed(): return runner + elif hasattr(inst, "setup"): + runner.run("setup", inst.setup) + if runner.failed(): return runner + + runner.run("test", test) + + if hasattr(inst, "tearDown"): + runner.run("teardown", inst.tearDown) + elif hasattr(inst, "teardown"): + runner.run("teardown", inst.teardown) + + return runner + + def __repr__(self): + return "MethodTest(%r, %r)" % (self.cls, self.method) + +class PatternMatcher: + + def __init__(self, *patterns): + self.patterns = patterns + + def matches(self, name): + for p in self.patterns: + if match(name, p): + return True + return False + +class FunctionScanner(PatternMatcher): + + def inspect(self, obj): + return type(obj) == types.FunctionType and self.matches(name) + + def descend(self, func): + return; yield + + def extract(self, func): + yield FunctionTest(func) + +class ClassScanner(PatternMatcher): + + def inspect(self, obj): + return type(obj) in (types.ClassType, types.TypeType) and self.matches(obj.__name__) + + def descend(self, cls): + return; yield + + def extract(self, cls): + names = dir(cls) + names.sort() + for name in names: + obj = getattr(cls, name) + t = type(obj) + if t == types.MethodType and name.startswith("test"): + yield MethodTest(cls, name) + +class ModuleScanner: + + def inspect(self, obj): + return type(obj) == types.ModuleType + + def descend(self, obj): + names = dir(obj) + names.sort() + for name in names: + yield getattr(obj, name) + + def extract(self, obj): + return; yield + +class Harness: + + def __init__(self): + self.scanners = [ + ModuleScanner(), + ClassScanner("*Test", "*Tests", "*TestCase"), + FunctionScanner("test_*") + ] + self.tests = [] + self.scanned = [] + + def scan(self, *roots): + objects = list(roots) + + while objects: + obj = objects.pop(0) + for s in self.scanners: + if s.inspect(obj): + self.tests.extend(s.extract(obj)) + for child in s.descend(obj): + if not (child in self.scanned or child in objects): + objects.append(child) + self.scanned.append(obj) + +modules = "qpid.tests", "tests", "tests_0-10" +h = Harness() +for name in modules: + m = __import__(name, fromlist=["dummy"]) + h.scan(m) + +filtered = [t for t in h.tests if included(t.name())] +passed = 0 +failed = 0 +for t in filtered: + if list_only: + print t.name() + else: + if t.run(): + passed += 1 + else: + failed += 1 + +if not list_only: + print colorize("Totals:", 1), \ + colorize_word("total", "%s tests" % len(filtered)) + ",", \ + colorize_word("pass", "%s passed" % passed) + ",", \ + colorize_word("fail" if failed else "pass", "%s failed" % failed) Propchange: qpid/trunk/qpid/python/qpid-python-test ------------------------------------------------------------------------------ svn:executable = * Added: qpid/trunk/qpid/python/qpid/debug.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/debug.py?rev=781044&view=auto ============================================================================== --- qpid/trunk/qpid/python/qpid/debug.py (added) +++ qpid/trunk/qpid/python/qpid/debug.py Tue Jun 2 14:24:57 2009 @@ -0,0 +1,45 @@ +import traceback, time, sys + +from threading import RLock + +def stackdump(*args): + print args + code = [] + for threadId, stack in sys._current_frames().items(): + code.append("\n# ThreadID: %s" % threadId) + for filename, lineno, name, line in traceback.extract_stack(stack): + code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) + if line: + code.append(" %s" % (line.strip())) + print "\n".join(code) + +import signal +signal.signal(signal.SIGQUIT, stackdump) + +#out = open("/tmp/stacks.txt", "write") + +class LoudLock: + + def __init__(self): + self.lock = RLock() + + def acquire(self, blocking=1): + import threading + while not self.lock.acquire(blocking=0): + time.sleep(1) + print >> out, "TRYING" +# print self.lock._RLock__owner, threading._active +# stackdump() + traceback.print_stack(None, None, out) + print >> out, "TRYING" + print >> out, "ACQUIRED" + traceback.print_stack(None, None, out) + print >> out, "ACQUIRED" + return True + + def _is_owned(self): + return self.lock._is_owned() + + def release(self): + self.lock.release() + Added: qpid/trunk/qpid/python/qpid/messaging.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=781044&view=auto ============================================================================== --- qpid/trunk/qpid/python/qpid/messaging.py (added) +++ qpid/trunk/qpid/python/qpid/messaging.py Tue Jun 2 14:24:57 2009 @@ -0,0 +1,807 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +A candidate high level messaging API for python. + +Areas that still need work: + + - asynchronous send + - asynchronous error notification + - definition of the arguments for L{Session.sender} and L{Session.receiver} + - standard L{Message} properties + - L{Message} content encoding + - protocol negotiation/multiprotocol impl +""" + +import connection, time, sys, traceback +from codec010 import StringCodec +from datatypes import timestamp, uuid4, RangedSet, Message as Message010 +from logging import getLogger +from session import Client, INCOMPLETE +from spec import SPEC +from threading import Thread, RLock, Condition +from util import connect + +log = getLogger("qpid.messaging") + +static = staticmethod + +def synchronized(meth): + def sync_wrapper(self, *args, **kwargs): + self.lock() + try: + return meth(self, *args, **kwargs) + finally: + self.unlock() + return sync_wrapper + +class Lockable(object): + + def lock(self): + self._lock.acquire() + + def unlock(self): + self._lock.release() + + def wait(self, predicate, timeout=None): + passed = 0 + start = time.time() + while not predicate(): + if timeout is None: + # using the timed wait prevents keyboard interrupts from being + # blocked while waiting + self._condition.wait(3) + elif passed < timeout: + self._condition.wait(timeout - passed) + else: + return False + passed = time.time() - start + return True + + def notify(self): + self._condition.notify() + + def notifyAll(self): + self._condition.notifyAll() + +def default(value, default): + if value is None: + return default + else: + return value + +AMQP_PORT = 5672 +AMQPS_PORT = 5671 + +class Connection(Lockable): + + """ + A Connection manages a group of L{Sessions<Session>} and connects + them with a remote endpoint. + """ + + @static + def open(host, port=None): + """ + Creates an AMQP connection and connects it to the given host and port. + + @type host: str + @param host: the name or ip address of the remote host + @type port: int + @param port: the port number of the remote host + @rtype: Connection + @return: a connected Connection + """ + conn = Connection(host, port) + conn.connect() + return conn + + def __init__(self, host, port=None): + """ + Creates a connection. A newly created connection must be connected + with the Connection.connect() method before it can be started. + + @type host: str + @param host: the name or ip address of the remote host + @type port: int + @param port: the port number of the remote host + @rtype: Connection + @return: a disconnected Connection + """ + self.host = host + self.port = default(port, AMQP_PORT) + self.started = False + self._conn = None + self.id = str(uuid4()) + self.session_counter = 0 + self.sessions = {} + self._lock = RLock() + self._condition = Condition(self._lock) + + @synchronized + def session(self, name=None): + """ + Creates or retrieves the named session. If the name is omitted or + None, then a unique name is chosen based on a randomly generated + uuid. + + @type name: str + @param name: the session name + @rtype: Session + @return: the named Session + """ + + if name is None: + name = "%s:%s" % (self.id, self.session_counter) + self.session_counter += 1 + else: + name = "%s:%s" % (self.id, name) + + if self.sessions.has_key(name): + return self.sessions[name] + else: + ssn = Session(self, name, self.started) + self.sessions[name] = ssn + if self._conn is not None: + ssn._attach() + return ssn + + @synchronized + def _remove_session(self, ssn): + del self.sessions[ssn.name] + + @synchronized + def connect(self): + """ + Connect to the remote endpoint. + """ + if self._conn is not None: + return + self._socket = connect(self.host, self.port) + self._conn = connection.Connection(self._socket) + self._conn.start() + + for ssn in self.sessions.values(): + ssn._attach() + + @synchronized + def disconnect(self): + """ + Disconnect from the remote endpoint. + """ + if self._conn is not None: + self._conn.close() + self._conn = None + for ssn in self.sessions.values(): + ssn._disconnected() + + @synchronized + def connected(self): + """ + Return true if the connection is connected, false otherwise. + """ + return self._conn is not None + + @synchronized + def start(self): + """ + Start incoming message delivery for all sessions. + """ + self.started = True + for ssn in self.sessions.values(): + ssn.start() + + @synchronized + def stop(self): + """ + Stop incoming message deliveries for all sessions. + """ + for ssn in self.sessions.values(): + ssn.stop() + self.started = False + + @synchronized + def close(self): + """ + Close the connection and all sessions. + """ + for ssn in self.sessions.values(): + ssn.close() + self.disconnect() + +class Pattern: + """ + The pattern filter matches the supplied wildcard pattern against a + message subject. + """ + + def __init__(self, value): + self.value = value + + def _bind(self, ssn, exchange, queue): + ssn.exchange_bind(exchange=exchange, queue=queue, + binding_key=self.value.replace("*", "#")) + +FILTER_DEFAULTS = { + "topic": Pattern("*") + } + +def delegate(session): + class Delegate(Client): + + def message_transfer(self, cmd, headers, body): + session._message_transfer(cmd, headers, body) + return Delegate + +class Session(Lockable): + + """ + Sessions provide a linear context for sending and receiving + messages, and manage various Senders and Receivers. + """ + + def __init__(self, connection, name, started): + self.connection = connection + self.name = name + self.started = started + self._ssn = None + self.senders = [] + self.receivers = [] + self.closing = False + self.incoming = [] + self.closed = False + self.unacked = [] + self._lock = RLock() + self._condition = Condition(self._lock) + self.thread = Thread(target = self.run) + self.thread.setDaemon(True) + self.thread.start() + + def __repr__(self): + return "<Session %s>" % self.name + + def _attach(self): + self._ssn = self.connection._conn.session(self.name, delegate=delegate(self)) + self._ssn.auto_sync = False + self._ssn.invoke_lock = self._lock + self._ssn.lock = self._lock + self._ssn.condition = self._condition + for link in self.senders + self.receivers: + link._link() + + def _disconnected(self): + self._ssn = None + for link in self.senders + self.receivers: + link._disconnected() + + @synchronized + def _message_transfer(self, cmd, headers, body): + m = Message010(body) + m.headers = headers + m.id = cmd.id + msg = self._decode(m) + rcv = self.receivers[int(cmd.destination)] + msg._receiver = rcv + log.debug("RECV [%s] %s", self, msg) + self.incoming.append(msg) + self.notifyAll() + return INCOMPLETE + + def _decode(self, message): + dp = message.get("delivery_properties") + mp = message.get("message_properties") + ap = mp.application_headers + enc, dec = get_codec(mp.content_type) + content = dec(message.body) + msg = Message(content) + msg.id = mp.message_id + if ap is not None: + msg.to = ap.get("to") + msg.subject = ap.get("subject") + msg.user_id = mp.user_id + if mp.reply_to is not None: + msg.reply_to = reply_to2addr(mp.reply_to) + msg.correlation_id = mp.correlation_id + msg.properties = mp.application_headers + msg.content_type = mp.content_type + msg._transfer_id = message.id + return msg + + def _exchange_query(self, address): + # XXX: auto sync hack is to avoid deadlock on future + result = self._ssn.exchange_query(name=address, sync=True) + self._ssn.sync() + return result.get() + + @synchronized + def sender(self, target): + """ + Creates a L{Sender} that may be used to send L{Messages<Message>} + to the specified target. + + @type target: str + @param target: the target to which messages will be sent + @rtype: Sender + @return: a new Sender for the specified target + """ + sender = Sender(self, len(self.senders), target) + self.senders.append(sender) + if self._ssn is not None: + sender._link() + return sender + + @synchronized + def receiver(self, source, filter=None): + """ + Creates a receiver that may be used to actively fetch or to listen + for the arrival of L{Messages<Message>} from the specified source. + + @type source: str + @param source: the source of L{Messages<Message>} + @rtype: Receiver + @return: a new Receiver for the specified source + """ + receiver = Receiver(self, len(self.receivers), source, filter, + self.started) + self.receivers.append(receiver) + if self._ssn is not None: + receiver._link() + return receiver + + def _peek(self, predicate): + for msg in self.incoming: + if predicate(msg): + return msg + + def _pop(self, predicate): + i = 0 + while i < len(self.incoming): + msg = self.incoming[i] + if predicate(msg): + del self.incoming[i] + return msg + else: + i += 1 + + @synchronized + def _get(self, predicate, timeout=None): + if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): + msg = self._pop(predicate) + if msg is not None: + self.unacked.append(msg) + log.debug("RETR [%s] %s", self, msg) + return msg + return None + + @synchronized + def acknowledge(self, message=None): + """ + Acknowledge the given L{Message}. If message is None, then all + unackednowledged messages on the session are acknowledged. + + @type message: Message + @param message: the message to acknowledge or None + """ + if message is None: + messages = self.unacked + else: + messages = [message] + + ids = RangedSet(*[m._transfer_id for m in self.unacked]) + for range in ids: + self._ssn.receiver._completed.add_range(range) + self._ssn.channel.session_completed(self._ssn.receiver._completed) + self._ssn.message_accept(ids, sync=True) + self._ssn.sync() + + for m in messages: + try: + self.unacked.remove(m) + except ValueError: + pass + + @synchronized + def start(self): + """ + Start incoming message delivery for the session. + """ + self.started = True + for rcv in self.receivers: + rcv.start() + + @synchronized + def stop(self): + """ + Stop incoming message delivery for the session. + """ + for rcv in self.receivers: + rcv.stop() + # TODO: think about stopping individual receivers in listen mode + self.wait(lambda: self._peek(self._pred) is None) + self.started = False + + def _pred(self, m): + return m._receiver.listener is not None + + @synchronized + def run(self): + try: + while True: + msg = self._get(self._pred) + if msg is None: + break; + else: + msg._receiver.listener(msg) + if self._peek(self._pred) is None: + self.notifyAll() + finally: + self.closed = True + self.notifyAll() + + @synchronized + def close(self): + """ + Close the session. + """ + for link in self.receivers + self.senders: + link.close() + + self.closing = True + self.notifyAll() + self.wait(lambda: self.closed) + while self.thread.isAlive(): + self.thread.join(3) + self.thread = None + self._ssn.close() + self._ssn = None + self.connection._remove_session(self) + +def parse_addr(address): + parts = address.split("/", 1) + if len(parts) == 1: + return parts[0], None + else: + return parts[0], parts[i1] + +def reply_to2addr(reply_to): + if reply_to.routing_key is None: + return reply_to.exchange + elif reply_to.exchange in (None, ""): + return reply_to.routing_key + else: + return "%s/%s" % (reply_to.exchange, reply_to.routing_key) + +class Disconnected(Exception): + """ + Exception raised when an operation is attempted that is illegal when + disconnected. + """ + pass + +class Sender(Lockable): + + """ + Sends outgoing messages. + """ + + def __init__(self, session, index, target): + self.session = session + self.index = index + self.target = target + self.closed = False + self._ssn = None + self._exchange = None + self._routing_key = None + self._subject = None + self._lock = self.session._lock + self._condition = self.session._condition + + def _link(self): + self._ssn = self.session._ssn + node, self._subject = parse_addr(self.target) + result = self.session._exchange_query(node) + if result.not_found: + # XXX: should check 'create' option + self._ssn.queue_declare(queue=node, durable=False, sync=True) + self._ssn.sync() + self._exchange = "" + self._routing_key = node + else: + self._exchange = node + self._routing_key = self._subject + + def _disconnected(self): + self._ssn = None + + @synchronized + def send(self, object): + """ + Send a message. If the object passed in is of type L{unicode}, + L{str}, L{list}, or L{dict}, it will automatically be wrapped in a + L{Message} and sent. If it is of type L{Message}, it will be sent + directly. + + @type object: unicode, str, list, dict, Message + @param object: the message or content to send + """ + + if self._ssn is None: + raise Disconnected() + + if isinstance(object, Message): + message = object + else: + message = Message(object) + # XXX: what if subject is specified for a normal queue? + rk = message.subject if self._routing_key is None else self._routing_key + # XXX: do we need to query to figure out how to create the reply-to interoperably? + rt = self._ssn.reply_to(*parse_addr(message.reply_to)) if message.reply_to else None + dp = self._ssn.delivery_properties(routing_key=rk) + mp = self._ssn.message_properties(message_id=message.id, + user_id=message.user_id, + reply_to=rt, + correlation_id=message.correlation_id, + content_type=message.content_type, + application_headers=message.properties) + if message.subject is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers["subject"] = message.subject + if message.to is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers["to"] = message.to + enc, dec = get_codec(message.content_type) + body = enc(message.content) + self._ssn.message_transfer(destination=self._exchange, + message=Message010(dp, mp, body), + sync=True) + log.debug("SENT [%s] %s", self.session, message) + self._ssn.sync() + + @synchronized + def close(self): + """ + Close the Sender. + """ + if not self.closed: + self.session.senders.remove(self) + self.closed = True + +class Empty(Exception): + """ + Exception raised by L{Receiver.fetch} when there is no message + available within the alloted time. + """ + pass + +class Receiver(Lockable): + + """ + Receives incoming messages from a remote source. Messages may be + actively fetched with L{fetch} or a listener may be installed with + L{listen}. + """ + + def __init__(self, session, index, source, filter, started): + self.session = session + self.index = index + self.destination = str(self.index) + self.source = source + self.filter = filter + self.started = started + self.closed = False + self.incoming = [] + self.listener = None + self._ssn = None + self._queue = None + self._lock = self.session._lock + self._condition = self.session._condition + + def _link(self): + self._ssn = self.session._ssn + result = self.session._exchange_query(self.source) + if result.not_found: + self._queue = self.source + # XXX: should check 'create' option + self._ssn.queue_declare(queue=self._queue, durable=False) + else: + self._queue = "%s.%s" % (self.session.name, self.destination) + self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True) + f = FILTER_DEFAULTS[result.type] if self.filter is None else self.filter + f._bind(self._ssn, self.source, self._queue) + self._ssn.message_subscribe(queue=self._queue, destination=self.destination, + sync=True) + self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit) + self._ssn.sync() + if self.started: + self._start() + + def _disconnected(self): + self._ssn = None + + @synchronized + def listen(self, listener=None): + """ + Sets the message listener for this receiver. + + @type listener: callable + @param listener: a callable object to be notified on message arrival + """ + self.listener = listener + if self.listener is None: + self._ssn.message_stop(self.destination) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL, + sync=True) + self._ssn.sync() + else: + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) + + def _pred(self, msg): + return msg._receiver == self + + @synchronized + def fetch(self, timeout=None): + """ + Fetch and return a single message. A timeout of None will block + forever waiting for a message to arrive, a timeout of zero will + return immediately if no messages are available. + + @type timeout: float + @param timeout: the time to wait for a message to be available + """ + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, + 0xFFFFFFFFL) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) + msg = self.session._get(self._pred, timeout=timeout) + if msg is None: + self._ssn.message_flush(self.destination) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, + 0xFFFFFFFFL, sync=True) + self._ssn.sync() + msg = self.session._get(self._pred, timeout=0) + if msg is None: + raise Empty() + return msg + + def _start(self): + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL) + if self.listener is not None: + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) + + @synchronized + def start(self): + """ + Start incoming message delivery for this receiver. + """ + self.started = True + if self._ssn is not None: + self._start() + + def _stop(self): + self._ssn.message_stop(self.destination) + + @synchronized + def stop(self): + """ + Stop incoming message delivery for this receiver. + """ + if self._ssn is not None: + self._stop() + self.started = False + + @synchronized + def close(self): + """ + Close the receiver. + """ + if not self.closed: + self.closed = True + self._ssn.message_cancel(self.destination, sync=True) + self._ssn.sync() + self.session.receivers.remove(self) + + + +def codec(name): + type = SPEC.named[name] + + def encode(x): + sc = StringCodec(SPEC) + type.encode(sc, x) + return sc.encoded + + def decode(x): + sc = StringCodec(SPEC, x) + return type.decode(sc) + + return encode, decode + +TYPE_MAPPINGS={ + dict: "amqp/map", + list: "amqp/list", + unicode: "text/plain; charset=utf8", + buffer: None, + str: None, + None.__class__: None + } + +TYPE_CODEC={ + "amqp/map": codec("map"), + "amqp/list": codec("list"), + "text/plain; charset=utf8": (lambda x: x.encode("utf8"), lambda x: x.decode("utf8")), + None: (lambda x: x, lambda x: x) + } + +def get_type(content): + return TYPE_MAPPINGS[content.__class__] + +def get_codec(content_type): + return TYPE_CODEC[content_type] + +class Message: + + """ + A message consists of a standard set of fields, an application + defined set of properties, and some content. + + @type id: str + @ivar id: the message id + @type user_id: ??? + @ivar user_id: the user-id of the message producer + @type to: ??? + @ivar to: ??? + @type reply_to: ??? + @ivar reply_to: ??? + @type correlation_id: str + @ivar correlation_id: a correlation-id for the message + @type properties: dict + @ivar properties: application specific message properties + @type content_type: str + @ivar content_type: the content-type of the message + @type content: str, unicode, buffer, dict, list + @ivar content: the message content + """ + + def __init__(self, content=None): + """ + Construct a new message with the supplied content. The + content-type of the message will be automatically inferred from + type of the content parameter. + + @type content: str, unicode, buffer, dict, list + @param content: the message content + """ + self.id = None + self.subject = None + self.user_id = None + self.to = None + self.reply_to = None + self.correlation_id = None + self.properties = {} + self.content_type = get_type(content) + self.content = content + + def __repr__(self): + return "Message(%r)" % self.content + +__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message", + "Empty", "timestamp", "uuid4"] Modified: qpid/trunk/qpid/python/qpid/testlib.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=781044&r1=781043&r2=781044&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/testlib.py (original) +++ qpid/trunk/qpid/python/qpid/testlib.py Tue Jun 2 14:24:57 2009 @@ -412,7 +412,7 @@ sock = connect(host or testrunner.host, port or testrunner.port) if testrunner.url.scheme == URL.AMQPS: sock = ssl(sock) - conn = Connection(sock, testrunner.spec, username=testrunner.user, + conn = Connection(sock, username=testrunner.user, password=testrunner.password) conn.start(timeout=10) return conn Added: qpid/trunk/qpid/python/qpid/tests/__init__.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/__init__.py?rev=781044&view=auto ============================================================================== --- qpid/trunk/qpid/python/qpid/tests/__init__.py (added) +++ qpid/trunk/qpid/python/qpid/tests/__init__.py Tue Jun 2 14:24:57 2009 @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Test: + + def __init__(self, name): + self.name = name + + def configure(self, config): + self.config = config + +import messaging Added: qpid/trunk/qpid/python/qpid/tests/messaging.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=781044&view=auto ============================================================================== --- qpid/trunk/qpid/python/qpid/tests/messaging.py (added) +++ qpid/trunk/qpid/python/qpid/tests/messaging.py Tue Jun 2 14:24:57 2009 @@ -0,0 +1,402 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# setup, usage, teardown, errors(sync), errors(async), stress, soak, +# boundary-conditions, config + +import time +from qpid.tests import Test +from qpid.messaging import Connection, Disconnected, Empty, Message, uuid4 +from Queue import Queue, Empty as QueueEmpty + +class Base(Test): + + def setup_connection(self): + return None + + def setup_session(self): + return None + + def setup_sender(self): + return None + + def setup_receiver(self): + return None + + def setup(self): + self.broker = self.config.broker + self.conn = self.setup_connection() + self.ssn = self.setup_session() + self.snd = self.setup_sender() + self.rcv = self.setup_receiver() + + def teardown(self): + if self.conn is not None and self.conn.connected(): + self.conn.close() + + def ping(self, ssn): + # send a message + sender = ssn.sender("ping-queue") + content = "ping[%s]" % uuid4() + sender.send(content) + receiver = ssn.receiver("ping-queue") + msg = receiver.fetch(timeout=0) + ssn.acknowledge() + assert msg.content == content + + def drain(self, rcv): + msgs = [] + try: + while True: + msgs.append(rcv.fetch(0)) + except Empty: + pass + return msgs + +class SetupTests(Base): + + def testOpen(self): + # XXX: need to flesh out URL support/syntax + self.conn = Connection.open(self.broker.host, self.broker.port) + self.ping(self.conn.session()) + + def testConnect(self): + # XXX: need to flesh out URL support/syntax + self.conn = Connection(self.broker.host, self.broker.port) + self.conn.connect() + self.ping(self.conn.session()) + +class ConnectionTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port) + + def testSessionAnon(self): + ssn1 = self.conn.session() + ssn2 = self.conn.session() + self.ping(ssn1) + self.ping(ssn2) + assert ssn1 is not ssn2 + + def testSessionNamed(self): + ssn1 = self.conn.session("one") + ssn2 = self.conn.session("two") + self.ping(ssn1) + self.ping(ssn2) + assert ssn1 is not ssn2 + assert ssn1 is self.conn.session("one") + assert ssn2 is self.conn.session("two") + + def testDisconnect(self): + ssn = self.conn.session() + self.ping(ssn) + self.conn.disconnect() + import socket + try: + self.ping(ssn) + assert False, "ping succeeded" + except Disconnected: + # this is the expected failure when pinging on a disconnected + # connection + pass + self.conn.connect() + self.ping(ssn) + + def testStart(self): + ssn = self.conn.session() + assert not ssn.started + self.conn.start() + assert ssn.started + ssn2 = self.conn.session() + assert ssn2.started + + def testStop(self): + self.conn.start() + ssn = self.conn.session() + assert ssn.started + self.conn.stop() + assert not ssn.started + ssn2 = self.conn.session() + assert not ssn2.started + + def testClose(self): + self.conn.close() + assert not self.conn.connected() + +class SessionTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port) + + def setup_session(self): + return self.conn.session() + + def testSender(self): + snd = self.ssn.sender("test-snd-queue") + snd2 = self.ssn.sender(snd.target) + assert snd is not snd2 + snd2.close() + + content = "testSender[%s]" % uuid4() + snd.send(content) + rcv = self.ssn.receiver(snd.target) + msg = rcv.fetch(0) + assert msg.content == content + self.ssn.acknowledge(msg) + + def testReceiver(self): + rcv = self.ssn.receiver("test-rcv-queue") + rcv2 = self.ssn.receiver(rcv.source) + assert rcv is not rcv2 + rcv2.close() + + content = "testReceiver[%s]" % uuid4() + snd = self.ssn.sender(rcv.source) + snd.send(content) + msg = rcv.fetch(0) + assert msg.content == content + self.ssn.acknowledge(msg) + + def testStart(self): + rcv = self.ssn.receiver("test-start-queue") + assert not rcv.started + self.ssn.start() + assert rcv.started + rcv = self.ssn.receiver("test-start-queue") + assert rcv.started + + def testStop(self): + self.ssn.start() + rcv = self.ssn.receiver("test-stop-queue") + assert rcv.started + self.ssn.stop() + assert not rcv.started + rcv = self.ssn.receiver("test-stop-queue") + assert not rcv.started + + # XXX, we need a convenient way to assert that required queues are + # empty on setup, and possibly also to drain queues on teardown + def testAcknowledge(self): + # send a bunch of messages + snd = self.ssn.sender("test-ack-queue") + tid = "a" + contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)] + for c in contents: + snd.send(c) + + # drain the queue, verify the messages are there and then close + # without acking + rcv = self.ssn.receiver(snd.target) + msgs = self.drain(rcv) + assert contents == [m.content for m in msgs] + self.ssn.close() + + # drain the queue again, verify that they are all the messages + # were requeued, and ack this time before closing + self.ssn = self.conn.session() + rcv = self.ssn.receiver("test-ack-queue") + msgs = self.drain(rcv) + assert contents == [m.content for m in msgs] + self.ssn.acknowledge() + self.ssn.close() + + # drain the queue a final time and verify that the messages were + # dequeued + self.ssn = self.conn.session() + rcv = self.ssn.receiver("test-ack-queue") + msgs = self.drain(rcv) + assert len(msgs) == 0 + + def testClose(self): + self.ssn.close() + try: + self.ping(self.ssn) + assert False, "ping succeeded" + except Disconnected: + pass + +class ReceiverTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender("test-receiver-queue") + + def setup_receiver(self): + return self.ssn.receiver("test-receiver-queue") + + def testListen(self): + msgs = Queue() + def listener(m): + msgs.put(m) + self.ssn.acknowledge(m) + self.rcv.listen(listener) + content = "testListen[%s]" % uuid4() + self.snd.send(content) + try: + msg = msgs.get(timeout=3) + assert False, "did not expect message: %s" % msg + except QueueEmpty: + pass + self.rcv.start() + msg = msgs.get(timeout=3) + assert msg.content == content + + def testFetch(self): + try: + msg = self.rcv.fetch(0) + assert False, "unexpected message: %s" % msg + except Empty: + pass + try: + start = time.time() + msg = self.rcv.fetch(3) + assert False, "unexpected message: %s" % msg + except Empty: + elapsed = time.time() - start + assert elapsed >= 3 + + content = "testListen[%s]" % uuid4() + for i in range(3): + self.snd.send(content) + msg = self.rcv.fetch(0) + assert msg.content == content + msg = self.rcv.fetch(3) + assert msg.content == content + msg = self.rcv.fetch() + assert msg.content == content + self.ssn.acknowledge() + + # XXX: need testStart, testStop and testClose + +class MessageTests(Base): + + def testCreateString(self): + m = Message("string") + assert m.content == "string" + assert m.content_type is None + + def testCreateUnicode(self): + m = Message(u"unicode") + assert m.content == u"unicode" + assert m.content_type == "text/plain; charset=utf8" + + def testCreateMap(self): + m = Message({}) + assert m.content == {} + assert m.content_type == "amqp/map" + + def testCreateList(self): + m = Message([]) + assert m.content == [] + assert m.content_type == "amqp/list" + + def testContentTypeOverride(self): + m = Message() + m.content_type = "text/html; charset=utf8" + m.content = u"<html/>" + assert m.content_type == "text/html; charset=utf8" + +class MessageEchoTests(Base): + + def setup_connection(self): + return Connection.open(self.broker.host, self.broker.port) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self): + return self.ssn.sender("test-message-echo-queue") + + def setup_receiver(self): + return self.ssn.receiver("test-message-echo-queue") + + def check(self, msg): + self.snd.send(msg) + echo = self.rcv.fetch(0) + + assert msg.id == echo.id + assert msg.subject == echo.subject + assert msg.user_id == echo.user_id + assert msg.to == echo.to + assert msg.reply_to == echo.reply_to + assert msg.correlation_id == echo.correlation_id + assert msg.properties == echo.properties + assert msg.content_type == echo.content_type + assert msg.content == echo.content + + self.ssn.acknowledge(echo) + + def testStringContent(self): + self.check(Message("string")) + + def testUnicodeContent(self): + self.check(Message(u"unicode")) + + + TEST_MAP = {"key1": "string", + "key2": u"unicode", + "key3": 3, + "key4": -3, + "key5": 3.14, + "key6": -3.14, + "key7": ["one", 2, 3.14], + "key8": []} + + def testMapContent(self): + self.check(Message(MessageEchoTests.TEST_MAP)) + + def testListContent(self): + self.check(Message([])) + self.check(Message([1, 2, 3])) + self.check(Message(["one", 2, 3.14, {"four": 4}])) + + def testProperties(self): + msg = Message() + msg.to = "to-address" + msg.subject = "subject" + msg.correlation_id = str(uuid4()) + msg.properties = MessageEchoTests.TEST_MAP + msg.reply_to = "reply-address" + self.check(msg) + +class TestTestsXXX(Test): + + def testFoo(self): + print "this test has output" + + def testBar(self): + print "this test "*8 + print "has"*10 + print "a"*75 + print "lot of"*10 + print "output"*10 + + def testQux(self): + import sys + sys.stdout.write("this test has output with no newline") + + def testQuxFail(self): + import sys + sys.stdout.write("this test has output with no newline") + fdsa --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org