Re: [python]: sketch of high-level API

2009-04-22 Thread Rafael Schloming

Jonathan Robie wrote:

I like what I see, I have a bunch of questions.


from qpid.api import *

# open a connection
conn = Connection.open("localhost")
conn.start()



If I start the connection before creating sessions, does that mean each 
connection starts as soon as it is created?


Assuming you actually mean each *session* starts as soon as it is 
created, then yes.



# create a session
ssn = conn.session()

# send a message
sender = ssn.sender("my-queue")
sender.send("Hello World!")
# fetch the message
receiver = ssn.receiver("my-queue")
msg = receiver.fetch(0)

print msg

# acknowledge the message
ssn.acknowledge(msg)


How does this work with Topic Exchanges? The XML Exchange?


That parts not fully fleshed out yet. The intention is you'd somehow 
pass something to ssn.sender that would refer to the exchange, and at 
the other end you'd pass something to ssn.receiver that would specify 
the binding, e.g.:


sender = ssn.sender("my-exchange")
receiver = ssn.receiver("my-exchange", filter="...")


Could there be a default timeout for receiverfetch()?


There could be.


# define a listener
def printer(msg):
  print "L: %r" % msg
  ssn.acknowledge(msg)

# register the listener
receiver.listen(printer)

# send a bunch more messages
for content in ["this", "is", "a", "test"]:
  sender.send(content)

# disconnect for a while
conn.stop()
conn.disconnect()

# reconnect and continue doing work
conn.connect()
conn.start()

# unregister the listener
receiver.listen(None)


Can a receiver have only one listener?


Yes


# send more messages
sender.send("Hello Again!")

# drain the queue
while True:
  try:
msg = receiver.fetch(0)
print "D: %r" % msg
  except Empty:
break

# acknowledge all outstanding messages
ssn.acknowledge()
print "done"

# close the connection
conn.close()


How do I set delivery properties? Message properties?


The API doesn't expose a distinction between delivery and message 
properties. They way you'd set various headerish things is like this:


msg = Message("this is the content")
msg.content_type = "text/html; charset=utf8"
msg.properties["my-custom-property"] = "foo"

sender.send(msg)

If you pass the Sender.send method something that isn't a message, it 
will assume it is the content and construct the message, for you, e.g. 
sender.send("content") is equivalent to sender.send(Message("content")). 
Either one will default the content_type to "text/plain; charset=utf8".



How do I create queues? Bindings?


On the whole this API isn't intended to address creating/configuring 
internal broker entities, but rather just sending/receiving messages, so 
I wouldn't expect to see anything for general purpose queue 
creation/deletion/binding. Obviously there does need to be some indirect 
queue creating/binding for things like topics and temp queues, but 
beyond that I would expect people to create their queues via a distinct 
TBD API, possibly qmf based or something similar. That said, this area 
does need some more thought.


--Rafael


-
Apache Qpid - AMQP Messaging Implementation
Project:  http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org



Re: [python]: sketch of high-level API

2009-04-22 Thread Jonathan Robie

I like what I see, I have a bunch of questions.


from qpid.api import *

# open a connection
conn = Connection.open("localhost")
conn.start()



If I start the connection before creating sessions, does that mean each 
connection starts as soon as it is created?



# create a session
ssn = conn.session()

# send a message
sender = ssn.sender("my-queue")
sender.send("Hello World!")
# fetch the message
receiver = ssn.receiver("my-queue")
msg = receiver.fetch(0)

print msg

# acknowledge the message
ssn.acknowledge(msg)


How does this work with Topic Exchanges? The XML Exchange?

Could there be a default timeout for receiverfetch()?



# define a listener
def printer(msg):
  print "L: %r" % msg
  ssn.acknowledge(msg)

# register the listener
receiver.listen(printer)

# send a bunch more messages
for content in ["this", "is", "a", "test"]:
  sender.send(content)

# disconnect for a while
conn.stop()
conn.disconnect()

# reconnect and continue doing work
conn.connect()
conn.start()

# unregister the listener
receiver.listen(None)


Can a receiver have only one listener?


# send more messages
sender.send("Hello Again!")

# drain the queue
while True:
  try:
msg = receiver.fetch(0)
print "D: %r" % msg
  except Empty:
break

# acknowledge all outstanding messages
ssn.acknowledge()
print "done"

# close the connection
conn.close()


How do I set delivery properties? Message properties?

How do I create queues? Bindings?

Jonathan

-
Apache Qpid - AMQP Messaging Implementation
Project:  http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org



Re: [python]: sketch of high-level API

2009-04-21 Thread Gordon Sim

Rafael Schloming wrote:

To play with it:

  cd qpid/python
  patch -p0 < api.patch
  python example.py

The html.tgz tarball contains the API-doc as generated by epydoc. This 
includes a list of areas that still need to be fleshed out, improved, or 
are just missing.


Please have at it and let me know what you think.


On the whole, I really like this. I'd be interested in seeing how a 
pub-sub example would look for comparison.




-
Apache Qpid - AMQP Messaging Implementation
Project:  http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org



[python]: sketch of high-level API

2009-04-18 Thread Rafael Schloming

To play with it:

  cd qpid/python
  patch -p0 < api.patch
  python example.py

The html.tgz tarball contains the API-doc as generated by epydoc. This 
includes a list of areas that still need to be fleshed out, improved, or 
are just missing.


Please have at it and let me know what you think.

--Rafael

Index: qpid/api.py
===
--- qpid/api.py	(revision 0)
+++ qpid/api.py	(revision 0)
@@ -0,0 +1,654 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A candidate high level messaging API for python.
+
+Areas that still need work:
+
+  - asynchronous send
+  - asynchronous error notification
+  - definition of the arguments for L{Session.sender} and L{Session.receiver}
+  - standard L{Message} properties
+  - L{Message} content encoding
+  - protocol negotiation/multiprotocol impl
+"""
+
+import connection, time, sys, traceback
+from codec010 import StringCodec
+from datatypes import timestamp, uuid4, RangedSet, Message as Message010
+from session import Client, INCOMPLETE
+from spec import SPEC
+from threading import Thread, RLock, Condition
+from util import connect
+
+static = staticmethod
+
+def synchronized(meth):
+  def sync_wrapper(self, *args, **kwargs):
+self.lock()
+try:
+  return meth(self, *args, **kwargs)
+finally:
+  self.unlock()
+  return sync_wrapper
+
+class Lockable(object):
+
+  def lock(self):
+self._lock.acquire()
+
+  def unlock(self):
+self._lock.release()
+
+  def wait(self, predicate, timeout=None):
+passed = 0
+start = time.time()
+while not predicate():
+  if timeout is None:
+# using the timed wait prevents keyboard interrupts from being
+# blocked while waiting
+self._condition.wait(3)
+  elif passed < timeout:
+self._condition.wait(timeout - passed)
+  else:
+return False
+  passed = time.time() - start
+return True
+
+  def notify(self):
+self._condition.notify()
+
+  def notifyAll(self):
+self._condition.notifyAll()
+
+AMQP_PORT = 5672
+AMQPS_PORT = 5671
+
+class Connection(Lockable):
+
+  """
+  A Connection manages a group of L{Sessions} and connects
+  them with a remote endpoint.
+  """
+
+  @static
+  def open(host, port=AMQP_PORT):
+"""
+Creates an AMQP connection and connects it to the given host and port.
+
+@type host: str
+@param host: the name or ip address of the remote host
+@type port: int
+@param port: the port number of the remote host
+@rtype: Connection
+@return: a connected Connection
+"""
+conn = Connection(host, port)
+conn.connect()
+return conn
+
+  def __init__(self, host, port=AMQP_PORT):
+"""
+Creates a connection. A newly created connection must be connected
+with the Connection.connect() method before it can be started.
+
+@type host: str
+@param host: the name or ip address of the remote host
+@type port: int
+@param port: the port number of the remote host
+@rtype: Connection
+@return: a disconnected Connection
+"""
+self.host = host
+self.port = port
+self.started = False
+self._conn = None
+self.id = str(uuid4())
+self.session_counter = 0
+self.sessions = {}
+self._lock = RLock()
+self._condition = Condition(self._lock)
+
+  @synchronized
+  def session(self, name=None):
+"""
+Creates or retrieves the named session. If the name is omitted or
+None, then a unique name is chosen based on a randomly generated
+uuid.
+
+@type name: str
+@param name: the session name
+@rtype: Session
+@return: the named Session
+"""
+
+if name is None:
+  name = "%s:%s" % (self.id, self.session_counter)
+  self.session_counter += 1
+
+if self.sessions.has_key(name):
+  return self.sessions[name]
+else:
+  ssn = Session(self, name, self.started)
+  self.sessions[name] = ssn
+  if self._conn is not None:
+ssn._attach()
+  return ssn
+
+  @synchronized
+  def _remove_session(self, ssn):
+del self.sessions[ssn.name]
+
+  @synchronized
+  def connect(self):
+"""
+Connect to the remote endpoint.
+"""
+if self._conn is not Non