Author: rhs Date: Wed Sep 23 12:30:49 2009 New Revision: 818075 URL: http://svn.apache.org/viewvc?rev=818075&view=rev Log: switched API over to select based driver; added address parser
Added: qpid/trunk/qpid/python/qpid/address.py qpid/trunk/qpid/python/qpid/selector.py Modified: qpid/trunk/qpid/python/qpid/driver.py qpid/trunk/qpid/python/qpid/messaging.py Added: qpid/trunk/qpid/python/qpid/address.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/address.py?rev=818075&view=auto ============================================================================== --- qpid/trunk/qpid/python/qpid/address.py (added) +++ qpid/trunk/qpid/python/qpid/address.py Wed Sep 23 12:30:49 2009 @@ -0,0 +1,140 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import re + +TYPES = [] + +class Type: + + def __init__(self, name, pattern=None): + self.name = name + self.pattern = pattern + if self.pattern: + TYPES.append(self) + + def __repr__(self): + return self.name + +LBRACE = Type("LBRACE", r"\{") +RBRACE = Type("RBRACE", r"\}") +COLON = Type("COLON", r":") +COMMA = Type("COMMA", r",") +ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*') +NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+') +STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""") +WSPACE = Type("WSPACE", r"[ \n\r\t]+") +EOF = Type("EOF") + +class Token: + + def __init__(self, type, value): + self.type = type + self.value = value + + def __repr__(self): + return "%s: %r" % (self.type, self.value) + +joined = "|".join(["(%s)" % t.pattern for t in TYPES]) +LEXER = re.compile(joined) + +def lex(st): + pos = 0 + while pos < len(st): + m = LEXER.match(st, pos) + if m is None: + raise ValueError(repr(st[pos:])) + else: + idx = m.lastindex + t = Token(TYPES[idx - 1], m.group(idx)) + yield t + pos = m.end() + yield Token(EOF, None) + +class ParseError(Exception): pass + +class EOF(Exception): pass + +class Parser: + + def __init__(self, tokens): + self.tokens = [t for t in tokens if t.type is not WSPACE] + self.idx = 0 + + def next(self): + return self.tokens[self.idx] + + def matches(self, *types): + return self.next().type in types + + def eat(self, *types): + if types and not self.matches(*types): + raise ParseError("expecting %s -- got %s" % (", ".join(map(str, types)), self.next())) + else: + t = self.next() + self.idx += 1 + return t + + def parse(self): + result = self.address() + self.eat(EOF) + return result + + def address(self): + name = self.eat(ID).value + if self.matches(LBRACE): + options = self.map() + else: + options = None + return name, options + + def map(self): + self.eat(LBRACE) + result = {} + while True: + if self.matches(RBRACE): + self.eat(RBRACE) + break + else: + if self.matches(ID): + n, v = self.nameval() + result[n] = v + elif self.matches(COMMA): + self.eat(COMMA) + else: + raise ParseError("expecting (ID, COMMA), got %s" % self.next()) + return result + + def nameval(self): + name = self.eat(ID).value + self.eat(COLON) + val = self.value() + return (name, val) + + def value(self): + if self.matches(NUMBER, STRING): + return eval(self.eat().value) + elif self.matches(LBRACE): + return self.map() + else: + raise ParseError("expecting (NUMBER, STRING, LBRACE) got %s" % self.next()) + +def parse(addr): + return Parser(lex(addr)).parse() + +__all__ = ["parse"] Modified: qpid/trunk/qpid/python/qpid/driver.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=818075&r1=818074&r2=818075&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/driver.py (original) +++ qpid/trunk/qpid/python/qpid/driver.py Wed Sep 23 12:30:49 2009 @@ -17,13 +17,15 @@ # under the License. # -import compat, connection, socket, sys, time +import compat, connection, socket, struct, sys, time from concurrency import synchronized -from datatypes import RangedSet, Message as Message010 -from exceptions import Timeout +from datatypes import RangedSet, Serial +from exceptions import Timeout, VersionError +from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder from logging import getLogger from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED -from ops import delivery_mode +from ops import * +from selector import Selector from session import Client, INCOMPLETE, SessionDetached from threading import Condition, Thread from util import connect @@ -50,145 +52,361 @@ def __init__(self, target): self.target = target +# XXX + DURABLE_DEFAULT=True +# XXX + FILTER_DEFAULTS = { "topic": Pattern("*") } -def delegate(handler, session): - class Delegate(Client): +# XXX + +CLIENT_PROPERTIES = {"product": "qpid python client", + "version": "development", + "platform": os.name, + "qpid.client_process": os.path.basename(sys.argv[0]), + "qpid.client_pid": os.getpid(), + "qpid.client_ppid": os.getppid()} + +def noop(): pass + +class SessionState: + + def __init__(self, driver, session, name, channel): + self.driver = driver + self.session = session + self.name = name + self.channel = channel + self.detached = False + self.committing = False + self.aborting = False + + # sender state + self.sent = Serial(0) + self.acknowledged = RangedSet() + self.completions = {} + self.min_completion = self.sent + self.max_completion = self.sent + + # receiver state + self.received = None + self.executed = RangedSet() + + # XXX: need to periodically exchange completion/known_completion + + def write_cmd(self, cmd, completion=noop): + cmd.id = self.sent + self.sent += 1 + self.completions[cmd.id] = completion + self.max_completion = cmd.id + self.write_op(cmd) + + def write_op(self, op): + op.channel = self.channel + self.driver.write_op(op) - def message_transfer(self, cmd): - return handler._message_transfer(session, cmd) - return Delegate +# XXX +HEADER="!4s4B" + +EMPTY_DP = DeliveryProperties() +EMPTY_MP = MessageProperties() class Driver: def __init__(self, connection): self.connection = connection self._lock = self.connection._lock - self._wakeup_cond = Condition() - self._socket = None - self._conn = None + + self._selector = Selector.default() + self.reset() + + def reset(self): + self._opening = False + self._closing = False self._connected = False self._attachments = {} - self._modcount = self.connection._modcount - self.thread = Thread(target=self.run) - self.thread.setDaemon(True) - # XXX: need to figure out how to join on this thread + self._channel_max = 65536 + self._channels = 0 + self._sessions = {} + + self._socket = None + self._buf = "" + self._hdr = "" + self._op_enc = OpEncoder() + self._seg_enc = SegmentEncoder() + self._frame_enc = FrameEncoder() + self._frame_dec = FrameDecoder() + self._seg_dec = SegmentDecoder() + self._op_dec = OpDecoder() + self._timeout = None + + for ssn in self.connection.sessions.values(): + for m in ssn.acked + ssn.unacked + ssn.incoming: + m._transfer_id = None + for rcv in ssn.receivers: + rcv.impending = rcv.received + + @synchronized def wakeup(self): - self._wakeup_cond.acquire() - try: - self._wakeup_cond.notifyAll() - finally: - self._wakeup_cond.release() + self.dispatch() + self._selector.wakeup() def start(self): - self.thread.start() + self._selector.register(self) - def run(self): - while True: - self._wakeup_cond.acquire() + def fileno(self): + return self._socket.fileno() + + @synchronized + def reading(self): + return self._socket is not None + + @synchronized + def writing(self): + return self._socket is not None and self._buf + + @synchronized + def timing(self): + return self._timeout + + @synchronized + def readable(self): + error = None + recoverable = False + try: + data = self._socket.recv(64*1024) + if data: + log.debug("READ: %r", data) + else: + error = ("connection aborted",) + recoverable = True + except socket.error, e: + error = (e,) + recoverable = True + + if not error: try: - if self.connection._modcount <= self._modcount: - self._wakeup_cond.wait(10) - finally: - self._wakeup_cond.release() - self.dispatch(self.connection._modcount) + if len(self._hdr) < 8: + r = 8 - len(self._hdr) + self._hdr += data[:r] + data = data[r:] + + if len(self._hdr) == 8: + self.do_header(self._hdr) + + self._frame_dec.write(data) + self._seg_dec.write(*self._frame_dec.read()) + self._op_dec.write(*self._seg_dec.read()) + for op in self._op_dec.read(): + self.assign_id(op) + log.debug("RCVD: %r", op) + op.dispatch(self) + except VersionError, e: + error = (e,) + except: + msg = compat.format_exc() + error = (msg,) + + if error: + self._socket.close() + self.reset() + if recoverable and self.connection.reconnect: + self._timeout = time.time() + 3 + log.warn("recoverable error: %s" % error) + log.warn("sleeping 3 seconds") + else: + self.connection.error = error + + self.connection._waiter.notifyAll() + + def assign_id(self, op): + if isinstance(op, Command): + sst = self.get_sst(op) + op.id = sst.received + sst.received += 1 + + @synchronized + def writeable(self): + n = self._socket.send(self._buf) + log.debug("SENT: %r", self._buf[:n]) + self._buf = self._buf[n:] @synchronized - def dispatch(self, modcount): + def timeout(self): + log.warn("retrying ...") + self.dispatch() + self.connection._waiter.notifyAll() + + def write_op(self, op): + log.debug("SENT: %r", op) + self._op_enc.write(op) + self._seg_enc.write(*self._op_enc.read()) + self._frame_enc.write(*self._seg_enc.read()) + self._buf += self._frame_enc.read() + + def do_header(self, hdr): + cli_major = 0; cli_minor = 10 + magic, _, _, major, minor = struct.unpack(HEADER, hdr) + if major != cli_major or minor != cli_minor: + raise VersionError("client: %s-%s, server: %s-%s" % + (cli_major, cli_minor, major, minor)) + + def do_connection_start(self, start): + # XXX: should we use some sort of callback for this? + r = "\0%s\0%s" % (self.connection.username, self.connection.password) + m = self.connection.mechanism + self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, + mechanism=m, response=r)) + + def do_connection_tune(self, tune): + # XXX: is heartbeat protocol specific? + if tune.channel_max is not None: + self.channel_max = tune.channel_max + self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, + channel_max=self.channel_max)) + self.write_op(ConnectionOpen()) + + def do_connection_open_ok(self, open_ok): + self._connected = True + # XXX: maybe think about a more generic way to catchup with + # deferred work + self.dispatch() + + def connection_heartbeat(self, hrt): + self.write_op(ConnectionHeartbeat()) + + def do_connection_close(self, close): + self.write_op(ConnectionCloseOk()) + if close.reply_ok != close_code.normal: + self.connection.error = (close.reply_code, close.reply_text) + # XXX: should we do a half shutdown on the socket here? + # XXX: we really need to test this, we may end up reporting a + # connection abort after this, if we were to do a shutdown on read + # and stop reading, then we wouldn't report the abort, that's + # probably the right thing to do + + def do_connection_close_ok(self, close_ok): + self._socket.close() + self.reset() + + def do_session_attached(self, atc): + pass + + def do_session_command_point(self, cp): + sst = self.get_sst(cp) + sst.received = cp.command_id + + def do_session_completed(self, sc): + sst = self.get_sst(sc) + for r in sc.commands: + sst.acknowledged.add(r.lower, r.upper) + + if not sc.commands.empty(): + while sst.min_completion in sc.commands: + if sst.completions.has_key(sst.min_completion): + sst.completions.pop(sst.min_completion)() + sst.min_completion += 1 + + def session_known_completed(self, kcmp): + sst = self.get_sst(kcmp) + executed = RangedSet() + for e in sst.executed.ranges: + for ke in kcmp.ranges: + if e.lower in ke and e.upper in ke: + break + else: + executed.add_range(e) + sst.executed = completed + + def do_session_flush(self, sf): + sst = self.get_sst(sf) + if sf.expected: + if sst.received is None: + exp = None + else: + exp = RangedSet(sst.received) + sst.write_op(SessionExpected(exp)) + if sf.confirmed: + sst.write_op(SessionConfirmed(sst.executed)) + if sf.completed: + sst.write_op(SessionCompleted(sst.executed)) + + def dispatch(self): try: - if self._conn is None and self.connection._connected: + if self._socket is None and self.connection._connected and not self._opening: self.connect() - elif self._conn is not None and not self.connection._connected: + elif self._socket is not None and not self.connection._connected and not self._closing: self.disconnect() - if self._conn is not None: + if self._connected and not self._closing: for ssn in self.connection.sessions.values(): self.attach(ssn) self.process(ssn) - - exi = None except: - exi = sys.exc_info() - - if exi: msg = compat.format_exc() - recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer", - "Bad file descriptor", "start timed out", "Broken pipe"] - for r in recoverable: - if self.connection.reconnect and r in msg: - print "waiting to retry" - self.reset() - time.sleep(3) - print "retrying..." - return - else: - self.connection.error = (msg,) - - self._modcount = modcount - self.connection._waiter.notifyAll() + self.connection.error = (msg,) def connect(self): - if self._conn is not None: - return try: + # XXX: should make this non blocking self._socket = connect(self.connection.host, self.connection.port) + self._timeout = None except socket.error, e: - raise ConnectError(e) - self._conn = connection.Connection(self._socket) - try: - self._conn.start(timeout=10) - self._connected = True - except connection.VersionError, e: - raise ConnectError(e) - except Timeout: - print "start timed out" - raise ConnectError("start timed out") + if self.connection.reconnect: + self.reset() + self._timeout = time.time() + 3 + log.warn("recoverable error: %s", e) + log.warn("sleeping 3 seconds") + return + else: + raise e + self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) + self._opening = True def disconnect(self): - self._conn.close() - self.reset() - - def reset(self): - self._conn = None - self._connected = False - self._attachments.clear() - for ssn in self.connection.sessions.values(): - for m in ssn.acked + ssn.unacked + ssn.incoming: - m._transfer_id = None - for rcv in ssn.receivers: - rcv.impending = rcv.received - - def connected(self): - return self._conn is not None + self.write_op(ConnectionClose(close_code.normal)) + self._closing = True def attach(self, ssn): - _ssn = self._attachments.get(ssn) - if _ssn is None: - _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn)) - _ssn.auto_sync = False - _ssn.invoke_lock = self._lock - _ssn.lock = self._lock - _ssn.condition = self.connection._condition + sst = self._attachments.get(ssn) + if sst is None: + for i in xrange(0, self.channel_max): + if not self._sessions.has_key(i): + ch = i + break + else: + raise RuntimeError("all channels used") + sst = SessionState(self, ssn, ssn.name, ch) + sst.write_op(SessionAttach(name=ssn.name)) + sst.write_op(SessionCommandPoint(sst.sent, 0)) + sst.outgoing_idx = 0 + sst.acked = [] if ssn.transactional: - # XXX: adding an attribute to qpid.session.Session - _ssn.acked = [] - _ssn.tx_select() - self._attachments[ssn] = _ssn + sst.write_cmd(TxSelect()) + self._attachments[ssn] = sst + self._sessions[sst.channel] = sst for snd in ssn.senders: self.link_out(snd) for rcv in ssn.receivers: self.link_in(rcv) - if ssn.closing: - _ssn.close() - del self._attachments[ssn] - ssn.closed = True + if ssn.closing and not sst.detached: + sst.detached = True + sst.write_op(SessionDetach(name=ssn.name)) + + def get_sst(self, op): + return self._sessions[op.channel] + + def do_session_detached(self, dtc): + sst = self._sessions.pop(dtc.channel) + ssn = sst.session + del self._attachments[ssn] + ssn.closed = True def _exchange_query(self, ssn, address): # XXX: auto sync hack is to avoid deadlock on future @@ -197,16 +415,16 @@ return result.get() def link_out(self, snd): - _ssn = self._attachments[snd.session] + sst = self._attachments[snd.session] _snd = self._attachments.get(snd) if _snd is None: _snd = Attachment(snd) node, _snd._subject = parse_addr(snd.target) - result = self._exchange_query(_ssn, node) - if result.not_found: + # XXX: result = self._exchange_query(sst, node) +# if result.not_found: + if True: # XXX: should check 'create' option - _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True) - _ssn.sync() + sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT)) _snd._exchange = "" _snd._routing_key = node else: @@ -221,35 +439,37 @@ return _snd def link_in(self, rcv): - _ssn = self._attachments[rcv.session] + sst = self._attachments[rcv.session] _rcv = self._attachments.get(rcv) if _rcv is None: _rcv = Attachment(rcv) - result = self._exchange_query(_ssn, rcv.source) - if result.not_found: + # XXX: result = self._exchange_query(sst, rcv.source) +# if result.not_found: + _rcv.canceled = False + _rcv.draining = False + if True: _rcv._queue = rcv.source # XXX: should check 'create' option - _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT) + sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT)) else: _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) - _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) + sst.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) if rcv.filter is None: f = FILTER_DEFAULTS[result.type] else: f = rcv.filter - f._bind(_ssn, rcv.source, _rcv._queue) - _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination) - _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True) + f._bind(sst, rcv.source, _rcv._queue) + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination)) + sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit)) self._attachments[rcv] = _rcv - # XXX: need to kill syncs - _ssn.sync() if rcv.closing: - _ssn.message_cancel(rcv.destination, sync=True) - # XXX: need to kill syncs - _ssn.sync() - del self._attachments[rcv] - rcv.closed = True + if not _rcv.canceled: + def close_rcv(): + del self._attachments[rcv] + rcv.closed = True + sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv) + _rcv.canceled = True return None else: return _rcv @@ -257,80 +477,83 @@ def process(self, ssn): if ssn.closing: return - _ssn = self._attachments[ssn] + sst = self._attachments[ssn] - while ssn.outgoing: - msg = ssn.outgoing[0] + while sst.outgoing_idx < len(ssn.outgoing): + msg = ssn.outgoing[sst.outgoing_idx] snd = msg._sender self.send(snd, msg) - ssn.outgoing.pop(0) + sst.outgoing_idx += 1 for rcv in ssn.receivers: self.process_receiver(rcv) if ssn.acked: - messages = ssn.acked[:] - ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) - for range in ids: - _ssn.receiver._completed.add_range(range) - ch = _ssn.channel - if ch is None: - raise SessionDetached() - ch.session_completed(_ssn.receiver._completed) - _ssn.message_accept(ids, sync=True) - # XXX: really need to make this async so that we don't give up the lock - _ssn.sync() - - # XXX: we're ignoring acks that get lost when disconnected - for m in messages: - ssn.acked.remove(m) - if ssn.transactional: - _ssn.acked.append(m) - - if ssn.committing: - _ssn.tx_commit(sync=True) - # XXX: need to kill syncs - _ssn.sync() - del _ssn.acked[:] - ssn.committing = False - ssn.committed = True - ssn.aborting = False - ssn.aborted = False + messages = [m for m in ssn.acked if m not in sst.acked] + if messages: + # XXX: we're ignoring acks that get lost when disconnected, + # could we deal this via some message-id based purge? + ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + for range in ids: + sst.executed.add_range(range) + sst.write_op(SessionCompleted(sst.executed)) + def ack_ack(): + for m in messages: + ssn.acked.remove(m) + if not ssn.transactional: + sst.acked.remove(m) + sst.write_cmd(MessageAccept(ids, sync=True), ack_ack) + sst.acked.extend(messages) + + if ssn.committing and not sst.committing: + def commit_ok(): + del sst.acked[:] + ssn.committing = False + ssn.committed = True + ssn.aborting = False + ssn.aborted = False + sst.write_cmd(TxCommit(sync=True), commit_ok) + sst.committing = True + + if ssn.aborting and not sst.aborting: + sst.aborting = True + def do_rb(): + messages = sst.acked + ssn.unacked + ssn.incoming + ids = RangedSet(*[m._transfer_id for m in messages]) + for range in ids: + sst.executed.add_range(range) + sst.write_op(SessionCompleted(sst.executed)) + sst.write_cmd(MessageRelease(ids)) + sst.write_cmd(TxRollback(sync=True), do_rb_ok) + + def do_rb_ok(): + del ssn.incoming[:] + del ssn.unacked[:] + del sst.acked[:] + + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.returned = rcv.received + # XXX: do we need to update granted here as well? + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + ssn.aborting = False + ssn.aborted = True + ssn.committing = False + ssn.committed = False + sst.aborting = False - if ssn.aborting: for rcv in ssn.receivers: - _ssn.message_stop(rcv.destination) - _ssn.sync() - - messages = _ssn.acked + ssn.unacked + ssn.incoming - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - _ssn.receiver._completed.add_range(range) - _ssn.channel.session_completed(_ssn.receiver._completed) - _ssn.message_release(ids) - _ssn.tx_rollback(sync=True) - _ssn.sync() - - del ssn.incoming[:] - del ssn.unacked[:] - del _ssn.acked[:] - - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.returned = rcv.received - # XXX: do we need to update granted here as well? - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - ssn.aborting = False - ssn.aborted = True - ssn.committing = False - ssn.committed = False + sst.write_cmd(MessageStop(rcv.destination)) + sst.write_cmd(ExecutionSync(sync=True), do_rb) def grant(self, rcv): - _ssn = self._attachments[rcv.session] + sst = self._attachments[rcv.session] _rcv = self.link_in(rcv) + if _rcv is None or _rcv.draining: + return if rcv.granted is UNLIMITED: if rcv.impending is UNLIMITED: @@ -343,29 +566,30 @@ delta = max(rcv.granted, rcv.received) - rcv.impending if delta is UNLIMITED: - _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) - _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value) + sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value)) rcv.impending = UNLIMITED elif delta > 0: - _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) - _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta) + sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) + sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta)) rcv.impending += delta elif delta < 0: + _rcv.draining = True + def flush_stop_cmplt(): + rcv.impending = rcv.received + _rcv.draining = False + self.grant(rcv) if rcv.drain: - _ssn.message_flush(rcv.destination, sync=True) + sst.write_cmd(MessageFlush(rcv.destination, sync=True), flush_stop_cmplt) else: - _ssn.message_stop(rcv.destination, sync=True) - # XXX: need to kill syncs - _ssn.sync() - rcv.impending = rcv.received - self.grant(rcv) + sst.write_cmd(MessageStop(rcv.destination, sync=True), flush_stop_cmplt) def process_receiver(self, rcv): if rcv.closed: return self.grant(rcv) def send(self, snd, msg): - _ssn = self._attachments[snd.session] + sst = self._attachments[snd.session] _snd = self.link_out(snd) # XXX: what if subject is specified for a normal queue? @@ -375,16 +599,16 @@ rk = _snd._routing_key # XXX: do we need to query to figure out how to create the reply-to interoperably? if msg.reply_to: - rt = _ssn.reply_to(*parse_addr(msg.reply_to)) + rt = ReplyTo(*parse_addr(msg.reply_to)) else: rt = None - dp = _ssn.delivery_properties(routing_key=rk) - mp = _ssn.message_properties(message_id=msg.id, - user_id=msg.user_id, - reply_to=rt, - correlation_id=msg.correlation_id, - content_type=msg.content_type, - application_headers=msg.properties) + dp = DeliveryProperties(routing_key=rk) + mp = MessageProperties(message_id=msg.id, + user_id=msg.user_id, + reply_to=rt, + correlation_id=msg.correlation_id, + content_type=msg.content_type, + application_headers=msg.properties) if msg.subject is not None: if mp.application_headers is None: mp.application_headers = {} @@ -397,37 +621,43 @@ dp.delivery_mode = delivery_mode.persistent enc, dec = get_codec(msg.content_type) body = enc(msg.content) - _ssn.message_transfer(destination=_snd._exchange, - message=Message010(dp, mp, body), - sync=True) - log.debug("SENT [%s] %s", snd.session, msg) - # XXX: really need to make this async so that we don't give up the lock - _ssn.sync() - # XXX: should we log the ack somehow too? - snd.acked += 1 + def msg_acked(): + # XXX: should we log the ack somehow too? + snd.acked += 1 + m = snd.session.outgoing.pop(0) + sst.outgoing_idx -= 1 + assert msg == m + sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), + payload=body, sync=True), msg_acked) + + def do_message_transfer(self, xfr): + sst = self.get_sst(xfr) + ssn = sst.session - @synchronized - def _message_transfer(self, ssn, cmd): - m = Message010(cmd.payload) - m.headers = cmd.headers - m.id = cmd.id - msg = self._decode(m) - rcv = ssn.receivers[int(cmd.destination)] + msg = self._decode(xfr) + rcv = ssn.receivers[int(xfr.destination)] msg._receiver = rcv if rcv.impending is not UNLIMITED: - assert rcv.received < rcv.impending + assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) rcv.received += 1 log.debug("RECV [%s] %s", ssn, msg) ssn.incoming.append(msg) self.connection._waiter.notifyAll() return INCOMPLETE - def _decode(self, message): - dp = message.get("delivery_properties") - mp = message.get("message_properties") + def _decode(self, xfr): + dp = EMPTY_DP + mp = EMPTY_MP + + for h in xfr.headers: + if isinstance(h, DeliveryProperties): + dp = h + elif isinstance(h, MessageProperties): + mp = h + ap = mp.application_headers enc, dec = get_codec(mp.content_type) - content = dec(message.body) + content = dec(xfr.payload) msg = Message(content) msg.id = mp.message_id if ap is not None: @@ -440,5 +670,5 @@ msg.durable = dp.delivery_mode == delivery_mode.persistent msg.properties = mp.application_headers msg.content_type = mp.content_type - msg._transfer_id = message.id + msg._transfer_id = xfr.id return msg Modified: qpid/trunk/qpid/python/qpid/messaging.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=818075&r1=818074&r2=818075&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/messaging.py (original) +++ qpid/trunk/qpid/python/qpid/messaging.py Wed Sep 23 12:30:49 2009 @@ -77,7 +77,8 @@ """ @static - def open(host, port=None): + def open(host, port=None, username="guest", password="guest", + mechanism="PLAIN", heartbeat=None): """ Creates an AMQP connection and connects it to the given host and port. @@ -88,11 +89,12 @@ @rtype: Connection @return: a connected Connection """ - conn = Connection(host, port) + conn = Connection(host, port, username, password, mechanism, heartbeat) conn.connect() return conn - def __init__(self, host, port=None): + def __init__(self, host, port=None, username="guest", password="guest", + mechanism="PLAIN", heartbeat=None): """ Creates a connection. A newly created connection must be connected with the Connection.connect() method before it can be started. @@ -106,6 +108,11 @@ """ self.host = host self.port = default(port, AMQP_PORT) + self.username = username + self.password = password + self.mechanism = mechanism + self.heartbeat = heartbeat + self.started = False self.id = str(uuid4()) self.session_counter = 0 Added: qpid/trunk/qpid/python/qpid/selector.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/selector.py?rev=818075&view=auto ============================================================================== --- qpid/trunk/qpid/python/qpid/selector.py (added) +++ qpid/trunk/qpid/python/qpid/selector.py Wed Sep 23 12:30:49 2009 @@ -0,0 +1,157 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import atexit, os, time +from select import select +from threading import Thread, Lock + +class Acceptor: + + def __init__(self, sock, handler): + self.sock = sock + self.handler = handler + + def fileno(self): + return self.sock.fileno() + + def reading(self): + return True + + def writing(self): + return False + + def readable(self): + sock, addr = self.sock.accept() + self.handler(sock) + +class Sink: + + def __init__(self, fd): + self.fd = fd + + def fileno(self): + return self.fd + + def reading(self): + return True + + def readable(self): + os.read(self.fd, 65536) + + def __repr__(self): + return "Sink(%r)" % self.fd + +class Selector: + + lock = Lock() + DEFAULT = None + + @staticmethod + def default(): + Selector.lock.acquire() + try: + if Selector.DEFAULT is None: + sel = Selector() + atexit.register(sel.stop) + sel.start() + Selector.DEFAULT = sel + return Selector.DEFAULT + finally: + Selector.lock.release() + + def __init__(self): + self.selectables = set() + self.reading = set() + self.writing = set() + self.wait_fd, self.wakeup_fd = os.pipe() + self.reading.add(Sink(self.wait_fd)) + self.stopped = False + self.thread = None + + def wakeup(self): + while True: + select([], [self.wakeup_fd], []) + if os.write(self.wakeup_fd, "\0") > 0: + break + + def register(self, selectable): + self.selectables.add(selectable) + self.modify(selectable) + + def _update(self, selectable): + if selectable.reading(): + self.reading.add(selectable) + else: + self.reading.discard(selectable) + if selectable.writing(): + self.writing.add(selectable) + else: + self.writing.discard(selectable) + return selectable.timing() + + def modify(self, selectable): + self._update(selectable) + self.wakeup() + + def unregister(self, selectable): + self.reading.discard(selectable) + self.writing.discard(selectable) + self.selectables.discard(selectable) + self.wakeup() + + def start(self): + self.stopped = False + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) + self.thread.start(); + + def run(self): + while not self.stopped: + wakeup = None + for sel in self.selectables.copy(): + t = self._update(sel) + if t is not None: + if wakeup is None: + wakeup = t + else: + wakeup = min(wakeup, t) + + if wakeup is None: + timeout = None + else: + timeout = max(0, wakeup - time.time()) + + rd, wr, ex = select(self.reading, self.writing, (), timeout) + + for sel in wr: + sel.writeable() + + for sel in rd: + sel.readable() + + now = time.time() + for sel in self.selectables.copy(): + w = sel.timing() + if w is not None and now > w: + sel.timeout() + + def stop(self, timeout=None): + self.stopped = True + self.wakeup() + self.thread.join(timeout) + self.thread = None --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org