Author: tross Date: Fri Mar 26 23:07:18 2010 New Revision: 928096 URL: http://svn.apache.org/viewvc?rev=928096&view=rev Log: For qmf-gen: - Added conditional generation of QMFv1 code (for broker and plugins only) - Added nesting IF/ENDIF capability to support above For python console: - Added support for V2 and V1 get queries - Handle both possible results of race between v2 data and the schema for the data
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h?rev=928096&r1=928095&r2=928096&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h (original) +++ qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h Fri Mar 26 23:07:18 2010 @@ -178,14 +178,17 @@ protected: virtual void doMethod(std::string& methodName, const messaging::VariantMap& inMap, messaging::VariantMap& outMap) = 0; - virtual uint32_t writePropertiesSize() const = 0; - virtual void readProperties(const std::string& buf) = 0; - virtual void writeProperties(std::string& buf) const = 0; - virtual void writeStatistics(std::string& buf, - bool skipHeaders = false) = 0; - virtual void doMethod(std::string& methodName, - const std::string& inBuf, - std::string& outBuf) = 0; + + /** + * The following five methods are not pure-virtual because they will only + * be overridden in cases where QMFv1 is to be supported. + */ + virtual uint32_t writePropertiesSize() const { return 0; } + virtual void readProperties(const std::string&) {} + virtual void writeProperties(std::string&) const {} + virtual void writeStatistics(std::string&, bool = false) {} + virtual void doMethod(std::string&, const std::string&, std::string&) {} + QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId); virtual std::string& getClassName() const = 0; Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen?rev=928096&r1=928095&r2=928096&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen (original) +++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen Fri Mar 26 23:07:18 2010 @@ -62,8 +62,10 @@ if len(args) == 0: vargs = {} if opts.brokerplugin: vargs["agentHeaderDir"] = "management" + vargs["genQmfV1"] = True else: vargs["agentHeaderDir"] = "agent" + vargs["genQmfV1"] = None for schemafile in args: package = SchemaPackage(typefile, schemafile, opts) Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py?rev=928096&r1=928095&r2=928096&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py Fri Mar 26 23:07:18 2010 @@ -38,39 +38,43 @@ class Template: self.filename = filename self.handler = handler self.handler.initExpansion () - self.writing = True + self.writing = 0 # 0 => write output lines; >0 => recursive depth of conditional regions def expandLine (self, line, stream, object): cursor = 0 while 1: sub = line.find ("/*MGEN:", cursor) if sub == -1: - if self.writing: + if self.writing == 0: stream.write (line[cursor:len (line)]) return subend = line.find("*/", sub) - if self.writing: + if self.writing == 0: stream.write (line[cursor:sub]) cursor = subend + 2 tag = line[sub:subend] if tag[7:10] == "IF(": - close = tag.find(")") - if close == -1: - raise ValueError ("Missing ')' on condition") - cond = tag[10:close] - dotPos = cond.find (".") - if dotPos == -1: - raise ValueError ("Invalid condition tag: %s" % cond) - tagObject = cond[0:dotPos] - tagName = cond[dotPos + 1 : len(cond)] - if not self.handler.testCondition(object, tagObject, tagName): - self.writing = False + if self.writing == 0: + close = tag.find(")") + if close == -1: + raise ValueError ("Missing ')' on condition") + cond = tag[10:close] + dotPos = cond.find (".") + if dotPos == -1: + raise ValueError ("Invalid condition tag: %s" % cond) + tagObject = cond[0:dotPos] + tagName = cond[dotPos + 1 : len(cond)] + if not self.handler.testCondition(object, tagObject, tagName): + self.writing += 1 + else: + self.writing += 1 elif tag[7:12] == "ENDIF": - self.writing = True + if self.writing > 0: + self.writing -= 1 else: equalPos = tag.find ("=") @@ -80,12 +84,12 @@ class Template: raise ValueError ("Invalid tag: %s" % tag) tagObject = tag[7:dotPos] tagName = tag[dotPos + 1:len (tag)] - if self.writing: + if self.writing == 0: self.handler.substHandler (object, stream, tagObject, tagName) else: tagKey = tag[7:equalPos] tagVal = tag[equalPos + 1:len (tag)] - if self.writing: + if self.writing == 0: self.handler.setVariable (tagKey, tagVal) def expand (self, object): @@ -297,6 +301,9 @@ class Generator: self.packagelist.append(path) self.packagePath = self.normalize(self.dest + path) + def testGenQMFv1 (self, variables): + return variables["genQmfV1"] + def genDisclaimer (self, stream, variables): prefix = variables["commentPrefix"] stream.write (prefix + " This source file was created by a code generator.\n") Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp?rev=928096&r1=928095&r2=928096&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp (original) +++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp Fri Mar 26 23:07:18 2010 @@ -133,7 +133,7 @@ void /*MGEN:Class.NameCap*/::aggregatePe } /*MGEN:ENDIF*/ - +/*MGEN:IF(Root.GenQMFv1)*/ uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const { uint32_t size = writeTimestampsSize(); @@ -263,7 +263,7 @@ void /*MGEN:Class.NameCap*/::doMethod (/ outBuf.getRawData(outStr, _bufLen); } - +/*MGEN:ENDIF*/ std::string /*MGEN:Class.NameCap*/::getKey() const { std::stringstream key; Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h?rev=928096&r1=928095&r2=928096&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h (original) +++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h Fri Mar 26 23:07:18 2010 @@ -83,6 +83,7 @@ class /*MGEN:Class.NameCap*/ : public :: const ::qpid::messaging::VariantMap& inMap, ::qpid::messaging::VariantMap& outMap); std::string getKey() const; +/*MGEN:IF(Root.GenQMFv1)*/ uint32_t writePropertiesSize() const; void readProperties(const std::string& buf); void writeProperties(std::string& buf) const; @@ -90,6 +91,7 @@ class /*MGEN:Class.NameCap*/ : public :: void doMethod(std::string& methodName, const std::string& inBuf, std::string& outBuf); +/*MGEN:ENDIF*/ writeSchemaCall_t getWriteSchemaCall() { return writeSchema; } /*MGEN:IF(Class.NoStatistics)*/ Modified: qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py?rev=928096&r1=928095&r2=928096&view=diff ============================================================================== --- qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py (original) +++ qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py Fri Mar 26 23:07:18 2010 @@ -162,18 +162,18 @@ class Object(object): if property.name in notPresent: self._properties.append((property, None)) else: - self._properties.append((property, self._session._decodeValue(codec, property.type, broker))) + self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker))) if stat: for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker))) + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker))) else: for property in schema.getProperties(): if property.optional: self._properties.append((property, None)) else: - self._properties.append((property, self._session._defaultValue(property, broker, kwargs))) + self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs))) for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs))) + self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs))) def v2Init(self, omap, agentName): if omap.__class__ != dict: @@ -828,7 +828,7 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) - def _handleCommandComplete(self, broker, codec, seq): + def _handleCommandComplete(self, broker, codec, seq, agent): code = codec.read_uint32() text = codec.read_str8() context = self.seqMgr._release(seq) @@ -850,6 +850,10 @@ class Session: finally: self.cv.release() + if agent: + agent._handleV1Completion(seq, code, text) + + def _handleClassInd(self, broker, codec, seq): kind = codec.read_uint8() classKey = ClassKey(codec) @@ -1714,6 +1718,7 @@ class Broker: self.authUser = authUser self.authPass = authPass self.cv = Condition() + self.seqToAgentMap = {} self.error = None self.brokerId = None self.connected = False @@ -1792,6 +1797,20 @@ class Broker: else: return "Disconnected Broker" + def _setSequence(self, sequence, agent): + try: + self.cv.acquire() + self.seqToAgentMap[sequence] = agent + finally: + self.cv.release() + + def _clearSequence(self, sequence): + try: + self.cv.acquire() + self.seqToAgentMap.pop(sequence) + finally: + self.cv.release() + def _tryToConnect(self): try: try: @@ -2071,17 +2090,28 @@ class Broker: agent = self.agents[agent_addr] codec = Codec(msg.body) + alreadyTried = None while True: opcode, seq = self._checkHeader(codec) + + if not agent and not alreadyTried: + alreadyTried = True + try: + self.cv.acquire() + if seq in self.seqToAgentMap: + agent = self.seqToAgentMap[seq] + finally: + self.cv.release() + if opcode == None: return if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent) elif agent: - agent._handleQmfV1Message(opcode, mp, ah, codec) + agent._handleQmfV1Message(opcode, seq, mp, ah, codec) self.amqpSession.receiver._completed.add(msg.id) self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) @@ -2261,6 +2291,7 @@ class Agent: try: self.lock.acquire() self.contextMap[sequence] = context + context.setSequence(sequence) finally: self.lock.release() @@ -2271,6 +2302,7 @@ class Agent: if self.isV2: self._v2SendGetQuery(sequence, kwargs) else: + self.broker._setSequence(sequence, self) self._v1SendGetQuery(sequence, kwargs) # @@ -2284,10 +2316,17 @@ class Agent: if context.exception: raise Exception(context.exception) result = context.queryResults - self.contextMap.pop(sequence) return result + def _clearContext(self, sequence): + try: + self.lock.acquire() + self.contextMap.pop(sequence) + finally: + self.lock.release() + + def _schemaInfoFromV2Agent(self): """ We have just received new schema information from this agent. Check to see if there's @@ -2295,7 +2334,9 @@ class Agent: """ try: self.lock.acquire() - copy_of_map = self.contextMap + copy_of_map = {} + for item in self.contextMap: + copy_of_map[item] = self.contextMap[item] finally: self.lock.release() @@ -2304,6 +2345,26 @@ class Agent: copy_of_map[context].reprocess() + def _handleV1Completion(self, sequence, code, text): + """ + Called if one of this agent's V1 commands completed + """ + context = None + try: + self.lock.acquire() + if sequence in self.contextMap: + context = self.contextMap[sequence] + finally: + self.lock.release() + + if context: + if code != 0: + ex = "Error %d: %s" % (code, text) + context.setException(ex) + context.signal() + self.broker._clearSequence(sequence) + + def _v1HandleMethodResp(self, codec, seq): """ Handle a QMFv1 method response @@ -2342,7 +2403,7 @@ class Agent: self.console.event(broker, event) - def _v1HandleContentInd(self, broker, codec, seq, prop=False, stat=False): + def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False): """ Handle a QMFv1 content indication """ @@ -2351,24 +2412,20 @@ class Agent: if not schema: return - obj = Object(self, broker, schema, codec, prop, stat) + obj = Object(self, schema, codec, prop, stat) if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: - broker._updateAgent(obj) + self.broker._updateAgent(obj) try: self.lock.acquire() - if seq in self.syncSequenceList: - if object.getTimestamps()[2] == 0 and self._selectMatch(object): - self.getResult.append(object) - return + if sequence in self.contextMap: + context = self.contextMap[sequence] finally: self.lock.release() - if self.console and self.rcvObjects: - if prop: - self.console.objectProps(broker, object) - if stat: - self.console.objectStats(broker, object) + if not context: + context = self.unsolicitedContext + context.addV1QueryResult(obj) def _v2HandleDataInd(self, mp, ah, content): @@ -2376,10 +2433,14 @@ class Agent: Handle a QMFv2 data indication from the agent """ if mp.correlation_id: - sequence = int(mp.correlation_id) - if sequence not in self.contextMap: - return - context = self.contextMap[sequence] + try: + self.lock.acquire() + sequence = int(mp.correlation_id) + if sequence not in self.contextMap: + return + context = self.contextMap[sequence] + finally: + self.lock.release() else: context = self.unsolicitedContext @@ -2405,8 +2466,33 @@ class Agent: pass - def _v1SendGetQuery(self, kwargs): - pass + def _v1SendGetQuery(self, sequence, kwargs): + """ + Send a get query to a QMFv1 agent. + """ + # + # Build the query map + # + query = {} + if '_class' in kwargs: + query['_class'] = kwargs['_class'] + if '_package' in kwargs: + query['_package'] = kwargs['_package'] + elif '_key' in kwargs: + key = kwargs['_key'] + query['_class'] = key.getClassName() + query['_package'] = key.getPackageName() + elif '_objectId' in kwargs: + query['_objectid'] = kwargs['_objectId'].__repr__() + + # + # Construct and transmit the message + # + sendCodec = Codec() + self.broker._setHeader(sendCodec, 'G', sequence) + sendCodec.write_map(query) + smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank)) + self.broker._send(smsg) def _v2SendGetQuery(self, sequence, kwargs): @@ -2460,7 +2546,7 @@ class Agent: self.broker._send(smsg, "qmf.default.direct") - def _handleQmfV1Message(self, opcode, mp, ah, codec): + def _handleQmfV1Message(self, opcode, seq, mp, ah, codec): """ Process QMFv1 messages arriving from an agent. """ @@ -2487,8 +2573,10 @@ class Agent: class RequestContext(object): """ This class tracks an asynchronous request sent to an agent. + TODO: Add logic for client-side selection and filtering deleted objects from get-queries """ def __init__(self, agent, notifiable): + self.sequence = None self.agent = agent self.schemaCache = self.agent.schemaCache self.notifiable = notifiable @@ -2497,10 +2585,22 @@ class RequestContext(object): self.queryResults = [] self.exception = None self.waitingForSchema = None + self.pendingSignal = None self.cv = Condition() self.blocked = notifiable == None + def setSequence(self, sequence): + self.sequence = sequence + + + def addV1QueryResult(self, data): + if self.notifiable: + self.notifyable(qmf_object=data) + else: + self.queryResults.append(data) + + def addV2QueryResult(self, data): self.rawQueryResults.append(data) @@ -2528,10 +2628,26 @@ class RequestContext(object): def signal(self): try: self.cv.acquire() - self.blocked = None - self.cv.notify() + if self.waitingForSchema: + self.pendingSignal = True + return + else: + self.blocked = None + self.cv.notify() finally: self.cv.release() + self._complete() + + + def _complete(self): + if self.notifiable: + if self.exception: + self.notifiable(qmf_exception=self.exception) + else: + self.notifiable(qmf_complete=True) + + if self.sequence: + self.agent._clearContext(self.sequence) def processV2Data(self): @@ -2568,6 +2684,19 @@ class RequestContext(object): else: self.queryResults.append(result) + complete = None + try: + self.cv.acquire() + if not self.waitingForSchema and self.pendingSignal: + self.blocked = None + self.cv.notify() + complete = True + finally: + self.cv.release() + + if complete: + self._complete() + def reprocess(self): """ --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org