Author: ritchiem Date: Wed Oct 28 15:34:46 2009 New Revision: 830605 URL: http://svn.apache.org/viewvc?rev=830605&view=rev Log: QPID-2126 - Sync the python QMF bindings to the current Ruby QMF bindings implementation Applied patch from Ken Giusti
Modified: qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/tests/python_agent.py qpid/branches/0.5.x-dev/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp Modified: qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py?rev=830605&r1=830604&r2=830605&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py Wed Oct 28 15:34:46 2009 @@ -21,6 +21,7 @@ import os from threading import Thread from threading import RLock +from threading import Condition import qmfengine from qmfengine import (ACCESS_READ_CREATE, ACCESS_READ_ONLY, ACCESS_READ_WRITE) from qmfengine import (CLASS_EVENT, CLASS_OBJECT) @@ -38,35 +39,60 @@ ## CONNECTION ##============================================================================== -class ConnectionSettings: +class ConnectionSettings(object): #attr_reader :impl def __init__(self, url=None): if url: self.impl = qmfengine.ConnectionSettings(url) else: self.impl = qmfengine.ConnectionSettings() - - + + def set_attr(self, key, val): if type(val) == str: _v = qmfengine.Value(TYPE_LSTR) _v.setString(val) - elif type(val) == bool: - _v = qmfengine.Value(TYPE_BOOL) - _v.setBool(val) elif type(val) == int: _v = qmfengine.Value(TYPE_UINT32) _v.setUint(val) + elif type(val) == bool: + _v = qmfengine.Value(TYPE_BOOL) + _v.setBool(val) else: - raise ArgumentError("Value for attribute '%s' has unsupported type: %s" % ( key, type(val))) - - self.impl.setAttr(key, _v) + raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, type(val))) + + good = self.impl.setAttr(key, _v) + if not good: + raise Exception("Argument error: unsupported attribute '%s'" % key ) + + + def get_attr(self, key): + _v = self.impl.getAttr(key) + if _v.isString(): + return _v.asString() + elif _v.isUint(): + return _v.asUint() + elif _v.isBool(): + return _v.asBool() + else: + raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, str(_v.getType()))) + + + def __getattr__(self, name): + return self.get_attr(name) + + + def __setattr__(self, name, value): + if name == "impl": + return super.__setattr__(self, name, value) + return self.set_attr(name, value) class ConnectionHandler: def conn_event_connected(self): None def conn_event_disconnected(self, error): None + def conn_event_visit(self): None def sess_event_session_closed(self, context, error): None def sess_event_recv(self, context, message): None @@ -80,23 +106,43 @@ self._sockEngine, self._sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) self.impl.setNotifyFd(self._sockEngine.fileno()) self._new_conn_handlers = [] + self._conn_handlers_to_delete = [] self._conn_handlers = [] + self._connected = False self.start() - + + def connected(self): + return self._connected + + + def kick(self): + self._sockEngine.send(".") + # self._sockEngine.flush() Not available with python? + + def add_conn_handler(self, handler): self._lock.acquire() try: self._new_conn_handlers.append(handler) finally: self._lock.release() - self._sockEngine.send("x") + self.kick() + def del_conn_handler(self, handler): + self._lock.acquire() + try: + self._conn_handlers_to_delete.append(handler) + finally: + self._lock.release() + self.kick() + + def run(self): eventImpl = qmfengine.ResilientConnectionEvent() - connected = False new_handlers = [] + del_handlers = [] bt_count = 0 while True: @@ -106,27 +152,33 @@ self._lock.acquire() try: new_handlers = self._new_conn_handlers + del_handlers = self._conn_handlers_to_delete self._new_conn_handlers = [] + self._conn_handlers_to_delete = [] finally: self._lock.release() for nh in new_handlers: self._conn_handlers.append(nh) - if connected: + if self._connected: nh.conn_event_connected() - new_handlers = [] + + for dh in del_handlers: + if dh in self._conn_handlers: + self._conn_handlers.remove(dh) + del_handlers = [] valid = self.impl.getEvent(eventImpl) while valid: try: if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED: - connected = True + self._connected = True for h in self._conn_handlers: h.conn_event_connected() elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED: - connected = False + self._connected = False for h in self._conn_handlers: h.conn_event_disconnected(eventImpl.errorText) @@ -146,6 +198,9 @@ self.impl.popEvent() valid = self.impl.getEvent(eventImpl) + + for h in self._conn_handlers: + h.conn_event_visit() @@ -158,7 +213,7 @@ result = self._conn.impl.createSession(label, self, self.handle) - def __del__(self): + def destroy(self): self._conn.impl.destroySession(self.handle) @@ -167,14 +222,30 @@ ## OBJECTS ##============================================================================== -class QmfObject: +class QmfObject(object): # attr_reader :impl, :object_class - def __init__(self, cls): - self.object_class = cls - self.impl = qmfengine.Object(self.object_class.impl) + def __init__(self, cls, kwargs={}): + self._cv = Condition() + self._sync_count = 0 + self._sync_result = None + self._allow_sets = False + if kwargs.has_key("broker"): + self._broker = kwargs["broker"] + else: + self._broker = None + if cls: + self.object_class = cls + self.impl = qmfengine.Object(self.object_class.impl) + elif kwargs.has_key("impl"): + self.impl = qmfengine.Object(kwargs["impl"]) + self.object_class = SchemaObjectClass(None, + None, + {"impl":self.impl.getClass()}) + else: + raise Exception("Argument error: required parameter ('impl') not supplied") - def __del__(self): + def destroy(self): self.impl.destroy() @@ -184,6 +255,20 @@ def set_object_id(self, oid): self.impl.setObjectId(oid.impl) + + + def properties(self): + list = [] + for prop in self.object_class.properties: + list.append([prop, self.get_attr(prop.name())]) + return list + + + def statistics(self): + list = [] + for stat in self.object_class.statistics: + list.append([stat, self.get_attr(stat.name())]) + return list def get_attr(self, name): @@ -197,7 +282,7 @@ elif vType == TYPE_LSTR: return val.asString() elif vType == TYPE_ABSTIME: return val.asInt64() elif vType == TYPE_DELTATIME: return val.asUint64() - elif vType == TYPE_REF: return val.asObjectId() + elif vType == TYPE_REF: return ObjectId(val.asObjectId()) elif vType == TYPE_BOOL: return val.asBool() elif vType == TYPE_FLOAT: return val.asFloat() elif vType == TYPE_DOUBLE: return val.asDouble() @@ -264,26 +349,172 @@ self.set_attr(name, self.get_attr(name) - by) + def __setattr__(self, name, value): + # + # Ignore the internal attributes, set them normally... + # + if (name[0] == '_' or + name == 'impl' or + name == 'object_class'): + return super.__setattr__(self, name, value) + + if not self._allow_sets: + raise Exception("'Set' operations not permitted on this object") + # + # If the name matches a property name, set the value of the property. + # + # print "set name=%s" % str(name) + for prop in self.object_class.properties: + if prop.name() == name: + return self.set_attr(name, value) + # + # otherwise, check for a statistic set... + # + for stat in self.object_class.statistics: + if stat.name() == name: + return self.set_attr(name, value) + + # unrecognized name? should I raise an exception? + super.__setattr__(self, name, value) + + + def __getattr__(self, name, *args): + # + # If the name matches a property name, return the value of the property. + # + for prop in self.object_class.properties: + if prop.name() == name: + return self.get_attr(name) + # + # Do the same for statistics + # + for stat in self.object_class.statistics: + if stat.name() == name: + return self.get_attr(name) + # + # If we still haven't found a match for the name, check to see if + # it matches a method name. If so, marshall up the arguments into + # a map, and invoke the method. + # + for method in self.object_class.methods: + if method.name() == name: + argMap = self._marshall(method, args) + return lambda name, argMap : self._invokeMethod(name, argMap) + + # + # This name means nothing to us, pass it up the line to the parent + # class's handler. + # + # print "__getattr__=%s" % str(name) + super.__getattr__(self, name) + + + def _invokeMethod(self, name, argMap): + """ + Private: Helper function that invokes an object's method, and waits for the result. + """ + self._cv.acquire() + try: + timeout = 30 + self._sync_count = 1 + self.impl.invokeMethod(name, argMap, self) + if self._broker: + self._broker.conn.kick() + self._cv.wait(timeout) + if self._sync_count == 1: + raise Exception("Timed out: waiting for response to method call.") + finally: + self._cv.release() + + return self._sync_result + + + def _method_result(self, result): + """ + Called to return the result of a method call on an object + """ + self._cv.acquire(); + try: + self._sync_result = result + self._sync_count -= 1 + self._cv.notify() + finally: + self._cv.release() + + + def _marshall(schema, args): + ''' + Private: Convert a list of arguments (positional) into a Value object of type "map". + Used to create the argument parameter for an object's method invokation. + ''' + # Build a map of the method's arguments + map = qmfengine.Value(TYPE_MAP) + for arg in schema.arguments: + if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: + map.insert(arg.name, qmfengine.Value(arg.typecode)) + + # install each argument's value into the map + marshalled = Arguments(map) + idx = 0 + for arg in schema.arguments: + if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: + if args[idx]: + marshalled[arg.name] = args[idx] + idx += 1 + + return marshalled.map + + def _value(self, name): val = self.impl.getValue(name) if not val: - raise ArgumentError("Attribute '%s' not defined for class %s" % (name, self.object_class.impl.getName())) + raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % + (name, + self.object_class.impl.getClassKey().getPackageName(), + self.object_class.impl.getClassKey().getClassName())) return val +class AgentObject(QmfObject): + def __init__(self, cls, kwargs={}): + QmfObject.__init__(self, cls, kwargs) + self._allow_sets = True + + + def destroy(self): + self.impl.destroy() + + + def set_object_id(self, oid): + self.impl.setObjectId(oid.impl) + + + class ConsoleObject(QmfObject): # attr_reader :current_time, :create_time, :delete_time - def __init__(self, cls): - QmfObject.__init__(self, cls) + def __init__(self, cls, kwargs={}): + QmfObject.__init__(self, cls, kwargs) - def update(self): pass - def mergeUpdate(self, newObject): pass + def update(self): + if not self._broker: + raise Exception("No linkage to broker") + newer = self._broker.console.objects(Query({"object_id":object_id})) + if newer.size != 1: + raise Exception("Expected exactly one update for this object, %d present" % int(newer.size)) + self.merge_update(newer[0]) + + + def merge_update(self, newObject): + self.impl.merge(new_object.impl) + + def is_deleted(self): - return self.delete_time > 0 + return self.impl.isDeleted() + + def index(self): pass - def method_missing(self, name, *args): pass @@ -303,8 +534,16 @@ return self.impl.getObjectNumLo() + def broker_bank(self): + return self.impl.getBrokerBank() + + + def agent_bank(self): + return self.impl.getAgentBank() + + def __eq__(self, other): - if self.__class__ != other.__class__: return False + if not isinstance(other, self.__class__): return False return (self.impl.getObjectNumHi() == other.impl.getObjectNumHi() and self.impl.getObjectNumLo() == other.impl.getObjectNumLo()) @@ -312,9 +551,12 @@ def __ne__(self, other): return not self.__eq__(other) + def __repr__(self): + return self.impl.str() -class Arguments: + +class Arguments(object): def __init__(self, map): self.map = map self._by_hash = {} @@ -335,9 +577,30 @@ def __iter__(self): - return _by_hash.__iter__ - - + return self._by_hash.__iter__ + + + def __getattr__(self, name): + if name in self._by_hash: + return self._by_hash[name] + return super.__getattr__(self, name) + + + def __setattr__(self, name, value): + # + # ignore local data members + # + if (name[0] == '_' or + name == 'map'): + return super.__setattr__(self, name, value) + + if name in self._by_hash: + self._by_hash[name] = value + return self.set(name, value) + + return super.__setattr__(self, name, value) + + def by_key(self, key): val = self.map.byKey(key) vType = val.getType() @@ -349,7 +612,7 @@ elif vType == TYPE_LSTR: return val.asString() elif vType == TYPE_ABSTIME: return val.asInt64() elif vType == TYPE_DELTATIME: return val.asUint64() - elif vType == TYPE_REF: return val.asObjectId() + elif vType == TYPE_REF: return ObjectId(val.asObjectId()) elif vType == TYPE_BOOL: return val.asBool() elif vType == TYPE_FLOAT: return val.asFloat() elif vType == TYPE_DOUBLE: return val.asDouble() @@ -405,18 +668,65 @@ +class MethodResponse(object): + def __init__(self, impl): + self.impl = qmfengine.MethodResponse(impl) + + + def status(self): + return self.impl.getStatus() + + + def exception(self): + return self.impl.getException() + + + def text(self): + return exception().asString() + + + def args(self): + return Arguments(self.impl.getArgs()) + + + def __getattr__(self, name): + myArgs = self.args() + return myArgs.__getattr__(name) + + + def __setattr__(self, name, value): + if name == 'impl': + return super.__setattr__(self, name, value) + + myArgs = self.args() + return myArgs.__setattr__(name, value) + + + + ##============================================================================== + ## QUERY + ##============================================================================== + + class Query: - def __init__(self, i=None, package="", cls=None, oid=None): - if i: - self.impl = i - else: - if cls: - self.impl = qmfengine.Query(cls, package) - elif oid: - self.impl = qmfengine.Query(oid) + def __init__(self, kwargs={}): + if "impl" in kwargs: + self.impl = kwargs["impl"] + else: + package = '' + if "key" in kwargs: + # construct using SchemaClassKey: + self.impl = qmfengine.Query(kwargs["key"]) + elif "object_id" in kwargs: + self.impl = qmfengine.Query(kwargs["object_id"].impl) else: - raise "Argument error" - + if "package" in kwargs: + package = kwargs["package"] + if "class" in kwargs: + self.impl = qmfengine.Query(kwargs["class"], package) + else: + raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']") + def package_name(self): return self.impl.getPackage() def class_name(self): return self.impl.getClass() @@ -437,48 +747,95 @@ class SchemaArgument: #attr_reader :impl def __init__(self, name, typecode, kwargs={}): - self.impl = qmfengine.SchemaArgument(name, typecode) - if kwargs.has_key("dir"): self.impl.setDirection(kwargs["dir"]) - if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) - if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + if "impl" in kwargs: + self.impl = kwargs["impl"] + else: + self.impl = qmfengine.SchemaArgument(name, typecode) + if kwargs.has_key("dir"): self.impl.setDirection(kwargs["dir"]) + if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + + + def name(self): + return self.impl.getName() + + + def direction(self): + return self.impl.getDirection() + + + def typecode(self): + return self.impl.getType() + + + def __repr__(self): + return self.name() class SchemaMethod: - # attr_reader :impl + # attr_reader :impl, arguments def __init__(self, name, kwargs={}): - self.impl = qmfengine.SchemaMethod(name) - if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) - self._arguments = [] + self.arguments = [] + if "impl" in kwargs: + self.impl = kwargs["impl"] + for i in range(self.impl.getArgumentCount()): + self.arguments.append(SchemaArgument(None,None,{"impl":self.impl.getArgument(i)})) + else: + self.impl = qmfengine.SchemaMethod(name) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) def add_argument(self, arg): - self._arguments.append(arg) + self.arguments.append(arg) self.impl.addArgument(arg.impl) + def name(self): + return self.impl.getName() + + def __repr__(self): + return self.name() + + class SchemaProperty: #attr_reader :impl def __init__(self, name, typecode, kwargs={}): - self.impl = qmfengine.SchemaProperty(name, typecode) - if kwargs.has_key("access"): self.impl.setAccess(kwargs["access"]) - if kwargs.has_key("index"): self.impl.setIndex(kwargs["index"]) - if kwargs.has_key("optional"): self.impl.setOptional(kwargs["optional"]) - if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) - if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + if "impl" in kwargs: + self.impl = kwargs["impl"] + else: + self.impl = qmfengine.SchemaProperty(name, typecode) + if kwargs.has_key("access"): self.impl.setAccess(kwargs["access"]) + if kwargs.has_key("index"): self.impl.setIndex(kwargs["index"]) + if kwargs.has_key("optional"): self.impl.setOptional(kwargs["optional"]) + if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) def name(self): return self.impl.getName() + def __repr__(self): + return self.name() + class SchemaStatistic: # attr_reader :impl def __init__(self, name, typecode, kwargs={}): - self.impl = qmfengine.SchemaStatistic(name, typecode) - if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) - if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + if "impl" in kwargs: + self.impl = kwargs["impl"] + else: + self.impl = qmfengine.SchemaStatistic(name, typecode) + if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + + + def name(self): + return self.impl.getName() + + def __repr__(self): + return self.name() @@ -488,60 +845,88 @@ self.impl = i - def get_package(self): - self.impl.getPackageName() + def package_name(self): + return self.impl.getPackageName() - def get_class(self): - self.impl.getClassName() + def class_name(self): + return self.impl.getClassName() + + def __repr__(self): + return self.impl.asString() class SchemaObjectClass: - # attr_reader :impl + # attr_reader :impl, :properties, :statistics, :methods def __init__(self, package, name, kwargs={}): - self.impl = qmfengine.SchemaObjectClass(package, name) - self._properties = [] - self._statistics = [] - self._methods = [] + self.properties = [] + self.statistics = [] + self.methods = [] + if "impl" in kwargs: + self.impl = kwargs["impl"] + + for i in range(self.impl.getPropertyCount()): + self.properties.append(SchemaProperty(None, None, {"impl":self.impl.getProperty(i)})) + + for i in range(self.impl.getStatisticCount()): + self.statistics.append(SchemaStatistic(None, None, {"impl":self.impl.getStatistic(i)})) + + for i in range(self.impl.getMethodCount()): + self.methods.append(SchemaMethod(None, {"impl":self.impl.getMethod(i)})) + else: + self.impl = qmfengine.SchemaObjectClass(package, name) def add_property(self, prop): - self._properties.append(prop) + self.properties.append(prop) self.impl.addProperty(prop.impl) def add_statistic(self, stat): - self._statistics.append(stat) + self.statistics.append(stat) self.impl.addStatistic(stat.impl) def add_method(self, meth): - self._methods.append(meth) + self.methods.append(meth) self.impl.addMethod(meth.impl) - def name(self): - return self.impl.getName() - - - def properties(self): - return self._properties + def class_key(self): + return SchemaClassKey(self.impl.getClassKey()) + + + def package_name(self): + return self.impl.getClassKey().getPackageName() + + + def class_name(self): + return self.impl.getClassKey().getClassName() + class SchemaEventClass: - # attr_reader :impl + # attr_reader :impl :arguments def __init__(self, package, name, kwargs={}): - self.impl = qmfengine.SchemaEventClass(package, name) - if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) - self._arguments = [] + self.arguments = [] + if "impl" in kwargs: + self.impl = kwargs["impl"] + for i in range(self.impl.getArgumentCount()): + self.arguments.append(SchemaArgument(nil, nil, {"impl":self.impl.getArgument(i)})) + else: + self.impl = qmfengine.SchemaEventClass(package, name) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) def add_argument(self, arg): - self._arguments.append(arg) + self.arguments.append(arg) self.impl.addArgument(arg.impl) + def name(self): + return self.impl.getClassKey().getClassName() + ##============================================================================== ## CONSOLE @@ -562,45 +947,178 @@ -class Console: +class Console(Thread): # attr_reader :impl - def initialize(handler=None, kwargs={}): - self._handler = handler - self.impl = qmfengine.Console() - self._event = qmfengine.ConsoleEvent() - self._broker_list = [] - + def __init__(self, handler=None, kwargs={}): + Thread.__init__(self) + self._handler = handler + self.impl = qmfengine.Console() + self._event = qmfengine.ConsoleEvent() + self._broker_list = [] + self._cv = Condition() + self._sync_count = 0 + self._sync_result = None + self._select = {} + self._cb_cond = Condition() + self.start() + def add_connection(self, conn): broker = Broker(self, conn) - self._broker_list.append(broker) + self._cv.acquire() + try: + self._broker_list.append(broker) + finally: + self._cv.release() return broker - def del_connection(self, broker): pass - - - def get_packages(self): pass - - - def get_classes(self, package): pass - - - def get_schema(self, class_key): pass - - - def bind_package(self, package): pass + def del_connection(self, broker): + broker.shutdown() + self._cv.acquire() + try: + self._broker_list.remove(broker) + finally: + self._cv.release() - def bind_class(self, kwargs = {}): pass + def packages(self): + plist = [] + for i in range(self.impl.packageCount()): + plist.append(self.impl.getPackageName(i)) + return plist + + + def classes(self, package, kind=CLASS_OBJECT): + clist = [] + for i in range(self.impl.classCount(package)): + key = self.impl.getClass(package, i) + class_kind = self.impl.getClassKind(key) + if class_kind == kind: + if kind == CLASS_OBJECT: + clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) + elif kind == CLASS_EVENT: + clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) + return clist + + + def bind_package(self, package): + return self.impl.bindPackage(package) + + + def bind_class(self, kwargs = {}): + if "key" in kwargs: + self.impl.bindClass(kwargs["key"]) + elif "package" in kwargs: + package = kwargs["package"] + if "class" in kwargs: + self.impl.bindClass(package, kwargs["class"]) + else: + self.impl.bindClass(package) + else: + raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") - def get_agents(self, broker=None): pass + def agents(self, broker=None): + blist = [] + if broker: + blist.append(broker) + else: + self._cv.acquire() + try: + # copy while holding lock + blist = self._broker_list[:] + finally: + self._cv.release() + + agents = [] + for b in blist: + for idx in range(b.impl.agentCount()): + agents.append(AgentProxy(b.impl.getAgent(idx), b)) + + return agents - def get_objects(self, query, kwargs = {}): pass + def objects(self, query, kwargs = {}): + timeout = 30 + temp_args = kwargs.copy() + if type(query) == type({}): + temp_args.update(query) + + if "timeout" in temp_args: + timeout = temp_args["timeout"] + temp_args.pop("timeout") + + if type(query) == type({}): + query = Query(temp_args) + + self._select = {} + for k in temp_args.iterkeys(): + if type(k) == str: + self._select[k] = temp_args[k] + + self._cv.acquire() + try: + self._sync_count = 1 + self._sync_result = [] + broker = self._broker_list[0] + broker.send_query(query.impl, None) + self._cv.wait(timeout) + if self._sync_count == 1: + raise Exception("Timed out: waiting for query response") + finally: + self._cv.release() + + return self._sync_result + def object(self, query, kwargs = {}): + ''' + Return one and only one object or None. + ''' + objs = objects(query, kwargs) + if len(objs) == 1: + return objs[0] + else: + return None + + + def first_object(self, query, kwargs = {}): + ''' + Return the first of potentially many objects. + ''' + objs = objects(query, kwargs) + if objs: + return objs[0] + else: + return None + + + # Check the object against select to check for a match + def _select_match(self, object): + schema_props = object.properties() + for key in self._select.iterkeys(): + for prop in schema_props: + if key == p[0].name() and self._select[key] != p[1]: + return False + return True + + + def _get_result(self, list, context): + ''' + Called by Broker proxy to return the result of a query. + ''' + self._cv.acquire() + try: + for item in list: + if self._select_match(item): + self._sync_result.append(item) + self._sync_count -= 1 + self._cv.notify() + finally: + self._cv.release() + + def start_sync(self, query): pass @@ -610,26 +1128,56 @@ def end_sync(self, sync): pass + def run(self): + while True: + self._cb_cond.acquire() + try: + self._cb_cond.wait(1) + while self.do_console_events(): + pass + finally: + self._cb_cond.release() + + + def start_console_events(self): + self._cb_cond.acquire() + try: + self._cb_cond.notify() + finally: + self._cb_cond.release() + + def do_console_events(self): + ''' + Called by Broker proxy to poll for Console events. Passes the events + onto the ConsoleHandler associated with this Console. + ''' count = 0 valid = self.impl.getEvent(self._event) while valid: count += 1 - print "Console Event:", self._event.kind + # print "Console Event:", self._event.kind if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: - pass + if self._handler: + self._handler.agent_added(AgentProxy(self._event.agent, None)) elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: - pass + if self._handler: + self._handler.agent_deleted(AgentProxy(self._event.agent, None)) elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: - pass + if self._handler: + self._handler.new_package(self._event.name) elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: - pass + if self._handler: + self._handler.new_class(SchemaClassKey(self._event.classKey)) elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: - pass + if self._handler: + self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), + self._event.hasProps, self._event.hasStats) elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: pass elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: - pass + if self._handler: + self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: pass @@ -639,37 +1187,104 @@ +class AgentProxy: + # attr_reader :broker + def __init__(self, impl, broker): + self.impl = impl + self.broker = broker + + + def label(self): + return self.impl.getLabel() + + + def broker_bank(self): + return self.impl.getBrokerBank() + + + def agent_bank(self): + return self.impl.getAgentBank() + + + class Broker(ConnectionHandler): - # attr_reader :impl + # attr_reader :impl :conn, :console, :broker_bank def __init__(self, console, conn): - self._console = console - self._conn = conn + ConnectionHandler.__init__(self) + self.broker_bank = 1 + self.console = console + self.conn = conn self._session = None + self._cv = Condition() + self._stable = None self._event = qmfengine.BrokerEvent() self._xmtMessage = qmfengine.Message() - self.impl = qmfengine.BrokerProxy(self._console.impl) - self._console.impl.addConnection(self.impl, self) - self._conn.add_conn_handler(self) + self.impl = qmfengine.BrokerProxy(self.console.impl) + self.console.impl.addConnection(self.impl, self) + self.conn.add_conn_handler(self) + self._operational = True + def shutdown(self): + self.console.impl.delConnection(self.impl) + self.conn.del_conn_handler(self) + self._operational = False + + + def wait_for_stable(self, timeout = None): + self._cv.acquire() + try: + if self._stable: + return + if timeout: + self._cv.wait(timeout) + if not self._stable: + raise Exception("Timed out: waiting for broker connection to become stable") + else: + while not self._stable: + self._cv.wait() + finally: + self._cv.release() + + + def send_query(self, query, ctx): + self.impl.sendQuery(query, ctx) + self.conn.kick() + + def do_broker_events(self): count = 0 valid = self.impl.getEvent(self._event) while valid: count += 1 - print "Broker Event: ", self._event.kind + # print "Broker Event: ", self._event.kind if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: pass elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: - self._conn.impl.declareQueue(self._session.handle, self._event.name) + self.conn.impl.declareQueue(self._session.handle, self._event.name) elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: - self._conn.impl.deleteQueue(self._session.handle, self._event.name) + self.conn.impl.deleteQueue(self._session.handle, self._event.name) elif self._event.kind == qmfengine.BrokerEvent.BIND: - self._conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) + self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) elif self._event.kind == qmfengine.BrokerEvent.UNBIND: - self._conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) + self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: self.impl.startProtocol() + elif self._event.kind == qmfengine.BrokerEvent.STABLE: + self_.cv.acquire() + try: + self._stable = True + self._cv.notify() + finally: + self._cv.release() + elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: + result = [] + for idx in range(self._event.queryResponse.getObjectCount()): + result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) + self.console._get_result(result, self._event.context) + elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: + obj = self._event.context + obj._method_result(MethodResponse(self._event.methodResponse())) self.impl.popEvent() valid = self.impl.getEvent(self._event) @@ -682,7 +1297,7 @@ valid = self.impl.getXmtMessage(self._xmtMessage) while valid: count += 1 - self._conn.impl.sendMessage(self._session.handle, self._xmtMessage) + self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) self.impl.popXmt() valid = self.impl.getXmtMessage(self._xmtMessage) @@ -691,16 +1306,16 @@ def do_events(self): while True: - ccnt = self._console.do_console_events() + self.console.start_console_events() bcnt = do_broker_events() mcnt = do_broker_messages() - if ccnt == 0 and bcnt == 0 and mcnt == 0: + if bcnt == 0 and mcnt == 0: break; def conn_event_connected(self): print "Console Connection Established..." - self._session = Session(self._conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) + self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) self.impl.sessionOpened(self._session.handle) self.do_events() @@ -710,12 +1325,18 @@ pass + def conn_event_visit(self): + self.do_events() + + def sess_event_session_closed(self, context, error): print "Console Session Lost" self.impl.sessionClosed() def sess_event_recv(self, context, message): + if not self._operational: + print "Unexpected RECV Event" self.impl.handleRcvMessage(message) self.do_events() @@ -778,7 +1399,7 @@ count += 1 if self._event.kind == qmfengine.AgentEvent.GET_QUERY: self._handler.get_query(self._event.sequence, - Query(self._event.query), + Query({"impl":self._event.query}), self._event.authUserId) elif self._event.kind == qmfengine.AgentEvent.START_SYNC: @@ -846,6 +1467,10 @@ pass + def conn_event_visit(self): + self.do_events() + + def sess_event_session_closed(self, context, error): print "Agent Session Lost" pass Modified: qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/tests/python_agent.py URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/tests/python_agent.py?rev=830605&r1=830604&r2=830605&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/tests/python_agent.py (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/tests/python_agent.py Wed Oct 28 15:34:46 2009 @@ -30,40 +30,40 @@ self.parent_class = qmf.SchemaObjectClass("org.apache.qpid.qmf", "parent") self.parent_class.add_property(qmf.SchemaProperty("name", qmf.TYPE_SSTR, {"index":True})) self.parent_class.add_property(qmf.SchemaProperty("state", qmf.TYPE_SSTR)) - + self.parent_class.add_property(qmf.SchemaProperty("uint64val", qmf.TYPE_UINT64)) self.parent_class.add_property(qmf.SchemaProperty("uint32val", qmf.TYPE_UINT32)) self.parent_class.add_property(qmf.SchemaProperty("uint16val", qmf.TYPE_UINT16)) self.parent_class.add_property(qmf.SchemaProperty("uint8val", qmf.TYPE_UINT8)) - + self.parent_class.add_property(qmf.SchemaProperty("int64val", qmf.TYPE_INT64)) self.parent_class.add_property(qmf.SchemaProperty("int32val", qmf.TYPE_INT32)) self.parent_class.add_property(qmf.SchemaProperty("int16val", qmf.TYPE_INT16)) self.parent_class.add_property(qmf.SchemaProperty("int8val", qmf.TYPE_INT8)) - + self.parent_class.add_statistic(qmf.SchemaStatistic("queryCount", qmf.TYPE_UINT32, {"unit":"query", "desc":"Query count"})) - + _method = qmf.SchemaMethod("echo", {"desc":"Check responsiveness of the agent object"}) _method.add_argument(qmf.SchemaArgument("sequence", qmf.TYPE_UINT32, {"dir":qmf.DIR_IN_OUT})) self.parent_class.add_method(_method) - + _method = qmf.SchemaMethod("set_numerics", {"desc":"Set the numeric values in the object"}) _method.add_argument(qmf.SchemaArgument("test", qmf.TYPE_SSTR, {"dir":qmf.DIR_IN})) self.parent_class.add_method(_method) - + _method = qmf.SchemaMethod("create_child", {"desc":"Create a new child object"}) _method.add_argument(qmf.SchemaArgument("child_name", qmf.TYPE_LSTR, {"dir":qmf.DIR_IN})) _method.add_argument(qmf.SchemaArgument("child_ref", qmf.TYPE_REF, {"dir":qmf.DIR_OUT})) self.parent_class.add_method(_method) - + _method = qmf.SchemaMethod("probe_userid", {"desc":"Return the user-id for this method call"}) _method.add_argument(qmf.SchemaArgument("userid", qmf.TYPE_SSTR, {"dir":qmf.DIR_OUT})) self.parent_class.add_method(_method) self.child_class = qmf.SchemaObjectClass("org.apache.qpid.qmf", "child") self.child_class.add_property(qmf.SchemaProperty("name", qmf.TYPE_SSTR, {"index":True})) - - + + def register(self, agent): agent.register_class(self.parent_class) agent.register_class(self.child_class) @@ -71,7 +71,13 @@ class App(qmf.AgentHandler): + ''' + Object that handles events received by the Agent. + ''' def get_query(self, context, query, userId): + ''' + Respond to a Query request from a console. + ''' #print "Query: user=%s context=%d class=%s" % (userId, context, query.class_name()) #if query.object_id(): # print query.object_id().object_num_low() @@ -84,108 +90,139 @@ def method_call(self, context, name, object_id, args, userId): - # puts "Method: user=#{userId} context=#{context} method=#{name} object_num=#{object_id.object_num_low if object_id} args=#{args}" - # oid = self._agent.alloc_object_id(2) - # args['child_ref'] = oid - # self._child = qmf.QmfObject(self._model.child_class) - # self._child.set_attr("name", args.by_key("child_name")) - # self._child.set_object_id(oid) - # self._agent.method_response(context, 0, "OK", args) + ''' + Invoke a method call requested by the console. + ''' + #print "Method: name=%s user=%s context=%d object_id=%s args=%s" % (name, userId, context, object_id, args) if name == "echo": self._agent.method_response(context, 0, "OK", args) - + elif name == "set_numerics": _retCode = 0 _retText = "OK" - + if args['test'] == "big": + # + # note the alternate forms for setting object attributes: + # self._parent.set_attr("uint64val", 0x9494949449494949) - self._parent.set_attr("uint32val", 0xa5a55a5a) + self._parent.uint32val = 0xa5a55a5a self._parent.set_attr("uint16val", 0xb66b) - self._parent.set_attr("uint8val", 0xc7) - - self._parent.set_attr("int64val", 1000000000000000000) + self._parent["uint8val"] = 0xc7 + + self._parent.int64val = 1000000000000000000 self._parent.set_attr("int32val", 1000000000) - self._parent.set_attr("int16val", 10000) + self._parent["int16val"] = 10000 self._parent.set_attr("int8val", 100) - - elif args['test'] == "small": + + ## Test the __getattr__ implementation: + ## @todo: remove once python_client implements this + ## form of property access + assert self._parent["uint8val"] == 0xc7 + assert self._parent.uint64val == 0x9494949449494949 + assert self._parent.queryCount >= 0 + + # note the alternative argument access syntax: + elif args.test == "small": self._parent.set_attr("uint64val", 4) self._parent.set_attr("uint32val", 5) self._parent.set_attr("uint16val", 6) self._parent.set_attr("uint8val", 7) - + self._parent.set_attr("int64val", 8) self._parent.set_attr("int32val", 9) self._parent.set_attr("int16val", 10) self._parent.set_attr("int8val", 11) - + elif args['test'] == "negative": self._parent.set_attr("uint64val", 0) self._parent.set_attr("uint32val", 0) self._parent.set_attr("uint16val", 0) self._parent.set_attr("uint8val", 0) - + self._parent.set_attr("int64val", -10000000000) self._parent.set_attr("int32val", -100000) self._parent.set_attr("int16val", -1000) self._parent.set_attr("int8val", -100) - + else: _retCode = 1 _retText = "Invalid argument value for test" - + self._agent.method_response(context, _retCode, _retText, args) - + elif name == "create_child": + # + # Instantiate an object based on the Child Schema Class + # _oid = self._agent.alloc_object_id(2) args['child_ref'] = _oid - self._child = qmf.QmfObject(self._model.child_class) + self._child = qmf.AgentObject(self._model.child_class) self._child.set_attr("name", args["child_name"]) self._child.set_object_id(_oid) self._agent.method_response(context, 0, "OK", args) - + elif name == "probe_userid": args['userid'] = userId self._agent.method_response(context, 0, "OK", args) - + else: self._agent.method_response(context, 1, "Unimplemented Method: %s" % name, args) - - + + def main(self): + ''' + Agent application's main processing loop. + ''' + # Connect to the broker self._settings = qmf.ConnectionSettings() + self._settings.sendUserId = True if len(sys.argv) > 1: - self._settings.set_attr("host", sys.argv[1]) + self._settings.host = str(sys.argv[1]) if len(sys.argv) > 2: - self._settings.set_attr("port", int(sys.argv[2])) + self._settings.port = int(sys.argv[2]) self._connection = qmf.Connection(self._settings) + + # Instantiate an Agent to serve me queries and method calls self._agent = qmf.Agent(self) - + + # Dynamically define the parent and child schemas, then + # register them with the agent self._model = Model() self._model.register(self._agent) - + + # Tell the agent about our connection to the broker self._agent.set_connection(self._connection) - - self._parent = qmf.QmfObject(self._model.parent_class) + + # Instantiate and populate an instance of the Parent + # Schema Object + self._parent = qmf.AgentObject(self._model.parent_class) + + ## @todo how do we force a test failure? + # verify the properties() and statistics() object methods: + assert len(self._parent.properties()) == 10 + assert len(self._parent.statistics()) == 1 + self._parent.set_attr("name", "Parent One") self._parent.set_attr("state", "OPERATIONAL") - + self._parent.set_attr("uint64val", 0) self._parent.set_attr("uint32val", 0) self._parent.set_attr("uint16val", 0) self._parent.set_attr("uint8val", 0) - + self._parent.set_attr("int64val", 0) self._parent.set_attr("int32val", 0) self._parent.set_attr("int16val", 0) self._parent.set_attr("int8val", 0) - + self._parent_oid = self._agent.alloc_object_id(1) self._parent.set_object_id(self._parent_oid) - - while True: # there may be a better way, but - time.sleep(1000) # I'm a python noob... + + # Now wait for events arriving on the connection + # to the broker... + while True: + time.sleep(1000) Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp?rev=830605&r1=830604&r2=830605&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp Wed Oct 28 15:34:46 2009 @@ -185,6 +185,11 @@ return intval; } + if (key == attrSendUserId) { + boolval.setBool(sendUserId); + return boolval; + } + return strval; } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org