Author: tross
Date: Fri Mar 26 23:07:18 2010
New Revision: 928096

URL: http://svn.apache.org/viewvc?rev=928096&view=rev
Log:
For qmf-gen:
  - Added conditional generation of QMFv1 code (for broker and plugins only)
  - Added nesting IF/ENDIF capability to support above
For python console:
  - Added support for V2 and V1 get queries
  - Handle both possible results of race between v2 data and the schema for the 
data

Modified:
    
qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h
    qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen
    qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py
    
qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h
    qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py

Modified: 
qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h?rev=928096&r1=928095&r2=928096&view=diff
==============================================================================
--- 
qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h 
(original)
+++ 
qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h 
Fri Mar 26 23:07:18 2010
@@ -178,14 +178,17 @@ protected:
     virtual void doMethod(std::string&           methodName,
                           const messaging::VariantMap& inMap,
                           messaging::VariantMap& outMap) = 0;
-    virtual uint32_t writePropertiesSize() const = 0;
-    virtual void readProperties(const std::string& buf) = 0;
-    virtual void writeProperties(std::string& buf) const = 0;
-    virtual void writeStatistics(std::string& buf,
-                                 bool skipHeaders = false) = 0;
-    virtual void doMethod(std::string&           methodName,
-                          const std::string& inBuf,
-                          std::string& outBuf) = 0;
+
+    /**
+     * The following five methods are not pure-virtual because they will only
+     * be overridden in cases where QMFv1 is to be supported.
+     */
+    virtual uint32_t writePropertiesSize() const { return 0; }
+    virtual void readProperties(const std::string&) {}
+    virtual void writeProperties(std::string&) const {}
+    virtual void writeStatistics(std::string&, bool = false) {}
+    virtual void doMethod(std::string&, const std::string&, std::string&) {}
+
     QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId);
 
     virtual std::string& getClassName() const = 0;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen?rev=928096&r1=928095&r2=928096&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmf-gen Fri Mar 26 
23:07:18 2010
@@ -62,8 +62,10 @@ if len(args) == 0:
 vargs = {}
 if opts.brokerplugin:
   vargs["agentHeaderDir"] = "management"
+  vargs["genQmfV1"]       = True
 else:
   vargs["agentHeaderDir"] = "agent"
+  vargs["genQmfV1"]       = None
 
 for schemafile in args:
   package = SchemaPackage(typefile, schemafile, opts)

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py?rev=928096&r1=928095&r2=928096&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py 
(original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/generate.py Fri 
Mar 26 23:07:18 2010
@@ -38,39 +38,43 @@ class Template:
     self.filename = filename
     self.handler  = handler
     self.handler.initExpansion ()
-    self.writing  = True
+    self.writing  = 0  # 0 => write output lines; >0 => recursive depth of 
conditional regions
 
   def expandLine (self, line, stream, object):
     cursor = 0
     while 1:
       sub = line.find ("/*MGEN:", cursor)
       if sub == -1:
-        if self.writing:
+        if self.writing == 0:
           stream.write (line[cursor:len (line)])
         return
 
       subend = line.find("*/", sub)
-      if self.writing:
+      if self.writing == 0:
         stream.write (line[cursor:sub])
       cursor = subend + 2
 
       tag = line[sub:subend]
 
       if tag[7:10] == "IF(":
-        close = tag.find(")")
-        if close == -1:
-          raise ValueError ("Missing ')' on condition")
-        cond = tag[10:close]
-        dotPos = cond.find (".")
-        if dotPos == -1:
-          raise ValueError ("Invalid condition tag: %s" % cond)
-        tagObject = cond[0:dotPos]
-        tagName   = cond[dotPos + 1 : len(cond)]
-        if not self.handler.testCondition(object, tagObject, tagName):
-          self.writing = False
+        if self.writing == 0:
+          close = tag.find(")")
+          if close == -1:
+            raise ValueError ("Missing ')' on condition")
+          cond = tag[10:close]
+          dotPos = cond.find (".")
+          if dotPos == -1:
+            raise ValueError ("Invalid condition tag: %s" % cond)
+          tagObject = cond[0:dotPos]
+          tagName   = cond[dotPos + 1 : len(cond)]
+          if not self.handler.testCondition(object, tagObject, tagName):
+            self.writing += 1
+        else:
+          self.writing += 1
 
       elif tag[7:12] == "ENDIF":
-        self.writing = True
+        if self.writing > 0:
+          self.writing -= 1
 
       else:
         equalPos = tag.find ("=")
@@ -80,12 +84,12 @@ class Template:
             raise ValueError ("Invalid tag: %s" % tag)
           tagObject = tag[7:dotPos]
           tagName   = tag[dotPos + 1:len (tag)]
-          if self.writing:
+          if self.writing == 0:
             self.handler.substHandler (object, stream, tagObject, tagName)
         else:
           tagKey = tag[7:equalPos]
           tagVal = tag[equalPos + 1:len (tag)]
-          if self.writing:
+          if self.writing == 0:
             self.handler.setVariable (tagKey, tagVal)
 
   def expand (self, object):
@@ -297,6 +301,9 @@ class Generator:
     self.packagelist.append(path)
     self.packagePath = self.normalize(self.dest + path)
 
+  def testGenQMFv1 (self, variables):
+    return variables["genQmfV1"]
+
   def genDisclaimer (self, stream, variables):
     prefix = variables["commentPrefix"]
     stream.write (prefix + " This source file was created by a code 
generator.\n")

Modified: 
qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp?rev=928096&r1=928095&r2=928096&view=diff
==============================================================================
--- 
qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp 
(original)
+++ 
qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp 
Fri Mar 26 23:07:18 2010
@@ -133,7 +133,7 @@ void /*MGEN:Class.NameCap*/::aggregatePe
 }
 /*MGEN:ENDIF*/
 
-
+/*MGEN:IF(Root.GenQMFv1)*/
 uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const
 {
     uint32_t size = writeTimestampsSize();
@@ -263,7 +263,7 @@ void /*MGEN:Class.NameCap*/::doMethod (/
 
     outBuf.getRawData(outStr, _bufLen);
 }
-
+/*MGEN:ENDIF*/
 std::string /*MGEN:Class.NameCap*/::getKey() const
 {
     std::stringstream key;

Modified: 
qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h?rev=928096&r1=928095&r2=928096&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h 
(original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.h 
Fri Mar 26 23:07:18 2010
@@ -83,6 +83,7 @@ class /*MGEN:Class.NameCap*/ : public ::
                   const ::qpid::messaging::VariantMap& inMap,
                   ::qpid::messaging::VariantMap& outMap);
     std::string getKey() const;
+/*MGEN:IF(Root.GenQMFv1)*/
     uint32_t writePropertiesSize() const;
     void readProperties(const std::string& buf);
     void writeProperties(std::string& buf) const;
@@ -90,6 +91,7 @@ class /*MGEN:Class.NameCap*/ : public ::
     void doMethod(std::string& methodName,
                   const std::string& inBuf,
                   std::string& outBuf);
+/*MGEN:ENDIF*/
 
     writeSchemaCall_t getWriteSchemaCall() { return writeSchema; }
 /*MGEN:IF(Class.NoStatistics)*/

Modified: qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py?rev=928096&r1=928095&r2=928096&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py Fri Mar 
26 23:07:18 2010
@@ -162,18 +162,18 @@ class Object(object):
           if property.name in notPresent:
             self._properties.append((property, None))
           else:
-            self._properties.append((property, 
self._session._decodeValue(codec, property.type, broker)))
+            self._properties.append((property, 
self._session._decodeValue(codec, property.type, self._broker)))
       if stat:
         for statistic in schema.getStatistics():
-          self._statistics.append((statistic, 
self._session._decodeValue(codec, statistic.type, broker)))
+          self._statistics.append((statistic, 
self._session._decodeValue(codec, statistic.type, self._broker)))
     else:
       for property in schema.getProperties():
         if property.optional:
           self._properties.append((property, None))
         else:
-          self._properties.append((property, 
self._session._defaultValue(property, broker, kwargs)))
+          self._properties.append((property, 
self._session._defaultValue(property, self._broker, kwargs)))
       for statistic in schema.getStatistics():
-          self._statistics.append((statistic, 
self._session._defaultValue(statistic, broker, kwargs)))
+          self._statistics.append((statistic, 
self._session._defaultValue(statistic, self._broker, kwargs)))
 
   def v2Init(self, omap, agentName):
     if omap.__class__ != dict:
@@ -828,7 +828,7 @@ class Session:
     smsg = broker._message(sendCodec.encoded)
     broker._send(smsg)
 
-  def _handleCommandComplete(self, broker, codec, seq):
+  def _handleCommandComplete(self, broker, codec, seq, agent):
     code = codec.read_uint32()
     text = codec.read_str8()
     context = self.seqMgr._release(seq)
@@ -850,6 +850,10 @@ class Session:
       finally:
         self.cv.release()
 
+    if agent:
+      agent._handleV1Completion(seq, code, text)
+
+
   def _handleClassInd(self, broker, codec, seq):
     kind  = codec.read_uint8()
     classKey = ClassKey(codec)
@@ -1714,6 +1718,7 @@ class Broker:
     self.authUser = authUser
     self.authPass = authPass
     self.cv = Condition()
+    self.seqToAgentMap = {}
     self.error = None
     self.brokerId = None
     self.connected = False
@@ -1792,6 +1797,20 @@ class Broker:
     else:
       return "Disconnected Broker"
 
+  def _setSequence(self, sequence, agent):
+    try:
+      self.cv.acquire()
+      self.seqToAgentMap[sequence] = agent
+    finally:
+      self.cv.release()
+
+  def _clearSequence(self, sequence):
+    try:
+      self.cv.acquire()
+      self.seqToAgentMap.pop(sequence)
+    finally:
+      self.cv.release()
+
   def _tryToConnect(self):
     try:
       try:
@@ -2071,17 +2090,28 @@ class Broker:
       agent = self.agents[agent_addr]
 
     codec = Codec(msg.body)
+    alreadyTried = None
     while True:
       opcode, seq = self._checkHeader(codec)
+
+      if not agent and not alreadyTried:
+        alreadyTried = True
+        try:
+          self.cv.acquire()
+          if seq in self.seqToAgentMap:
+            agent = self.seqToAgentMap[seq]
+        finally:
+          self.cv.release()
+
       if   opcode == None: return
       if   opcode == 'b': self.session._handleBrokerResp      (self, codec, 
seq)
       elif opcode == 'p': self.session._handlePackageInd      (self, codec, 
seq)
       elif opcode == 'q': self.session._handleClassInd        (self, codec, 
seq)
       elif opcode == 's': self.session._handleSchemaResp      (self, codec, 
seq, agent_addr)
       elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, 
seq, msg)
-      elif opcode == 'z': self.session._handleCommandComplete (self, codec, 
seq)
+      elif opcode == 'z': self.session._handleCommandComplete (self, codec, 
seq, agent)
       elif agent:
-        agent._handleQmfV1Message(opcode, mp, ah, codec)
+        agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
 
     self.amqpSession.receiver._completed.add(msg.id)
     
self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
@@ -2261,6 +2291,7 @@ class Agent:
     try:
       self.lock.acquire()
       self.contextMap[sequence] = context
+      context.setSequence(sequence)
     finally:
       self.lock.release()
 
@@ -2271,6 +2302,7 @@ class Agent:
     if self.isV2:
       self._v2SendGetQuery(sequence, kwargs)
     else:
+      self.broker._setSequence(sequence, self)
       self._v1SendGetQuery(sequence, kwargs)
 
     #
@@ -2284,10 +2316,17 @@ class Agent:
       if context.exception:
         raise Exception(context.exception)
       result = context.queryResults
-      self.contextMap.pop(sequence)
       return result
 
 
+  def _clearContext(self, sequence):
+    try:
+      self.lock.acquire()
+      self.contextMap.pop(sequence)
+    finally:
+      self.lock.release()
+
+
   def _schemaInfoFromV2Agent(self):
     """
     We have just received new schema information from this agent.  Check to 
see if there's
@@ -2295,7 +2334,9 @@ class Agent:
     """
     try:
       self.lock.acquire()
-      copy_of_map = self.contextMap
+      copy_of_map = {}
+      for item in self.contextMap:
+        copy_of_map[item] = self.contextMap[item]
     finally:
       self.lock.release()
 
@@ -2304,6 +2345,26 @@ class Agent:
       copy_of_map[context].reprocess()
 
 
+  def _handleV1Completion(self, sequence, code, text):
+    """
+    Called if one of this agent's V1 commands completed
+    """
+    context = None
+    try:
+      self.lock.acquire()
+      if sequence in self.contextMap:
+        context = self.contextMap[sequence]
+    finally:
+      self.lock.release()
+
+    if context:
+      if code != 0:
+        ex = "Error %d: %s" % (code, text)
+        context.setException(ex)
+      context.signal()
+    self.broker._clearSequence(sequence)
+
+
   def _v1HandleMethodResp(self, codec, seq):
     """
     Handle a QMFv1 method response
@@ -2342,7 +2403,7 @@ class Agent:
       self.console.event(broker, event)
 
 
-  def _v1HandleContentInd(self, broker, codec, seq, prop=False, stat=False):
+  def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False):
     """
     Handle a QMFv1 content indication
     """
@@ -2351,24 +2412,20 @@ class Agent:
     if not schema:
       return
 
-    obj = Object(self, broker, schema, codec, prop, stat)
+    obj = Object(self, schema, codec, prop, stat)
     if classKey.getPackageName() == "org.apache.qpid.broker" and 
classKey.getClassName() == "agent" and prop:
-      broker._updateAgent(obj)
+      self.broker._updateAgent(obj)
 
     try:
       self.lock.acquire()
-      if seq in self.syncSequenceList:
-        if object.getTimestamps()[2] == 0 and self._selectMatch(object):
-          self.getResult.append(object)
-        return
+      if sequence in self.contextMap:
+        context = self.contextMap[sequence]
     finally:
       self.lock.release()
 
-    if self.console and self.rcvObjects:
-      if prop:
-        self.console.objectProps(broker, object)
-      if stat:
-        self.console.objectStats(broker, object)
+    if not context:
+      context = self.unsolicitedContext
+    context.addV1QueryResult(obj)
 
 
   def _v2HandleDataInd(self, mp, ah, content):
@@ -2376,10 +2433,14 @@ class Agent:
     Handle a QMFv2 data indication from the agent
     """
     if mp.correlation_id:
-      sequence = int(mp.correlation_id)
-      if sequence not in self.contextMap:
-        return
-      context = self.contextMap[sequence]
+      try:
+        self.lock.acquire()
+        sequence = int(mp.correlation_id)
+        if sequence not in self.contextMap:
+          return
+        context = self.contextMap[sequence]
+      finally:
+        self.lock.release()
     else:
       context = self.unsolicitedContext
 
@@ -2405,8 +2466,33 @@ class Agent:
     pass
 
 
-  def _v1SendGetQuery(self, kwargs):
-    pass
+  def _v1SendGetQuery(self, sequence, kwargs):
+    """
+    Send a get query to a QMFv1 agent.
+    """
+    #
+    # Build the query map
+    #
+    query = {}
+    if '_class' in kwargs:
+      query['_class'] = kwargs['_class']
+      if '_package' in kwargs:
+        query['_package'] = kwargs['_package']
+    elif '_key' in kwargs:
+      key = kwargs['_key']
+      query['_class'] = key.getClassName()
+      query['_package'] = key.getPackageName()
+    elif '_objectId' in kwargs:
+      query['_objectid'] = kwargs['_objectId'].__repr__()
+
+    #
+    # Construct and transmit the message
+    #
+    sendCodec = Codec()
+    self.broker._setHeader(sendCodec, 'G', sequence)
+    sendCodec.write_map(query)
+    smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % 
(self.brokerBank, self.agentBank))
+    self.broker._send(smsg)
 
 
   def _v2SendGetQuery(self, sequence, kwargs):
@@ -2460,7 +2546,7 @@ class Agent:
     self.broker._send(smsg, "qmf.default.direct")
 
 
-  def _handleQmfV1Message(self, opcode, mp, ah, codec):
+  def _handleQmfV1Message(self, opcode, seq, mp, ah, codec):
     """
     Process QMFv1 messages arriving from an agent.
     """
@@ -2487,8 +2573,10 @@ class Agent:
 class RequestContext(object):
   """
   This class tracks an asynchronous request sent to an agent.
+  TODO: Add logic for client-side selection and filtering deleted objects from 
get-queries
   """
   def __init__(self, agent, notifiable):
+    self.sequence = None
     self.agent = agent
     self.schemaCache = self.agent.schemaCache
     self.notifiable = notifiable
@@ -2497,10 +2585,22 @@ class RequestContext(object):
     self.queryResults = []
     self.exception = None
     self.waitingForSchema = None
+    self.pendingSignal = None
     self.cv = Condition()
     self.blocked = notifiable == None
 
 
+  def setSequence(self, sequence):
+    self.sequence =  sequence
+
+
+  def addV1QueryResult(self, data):
+    if self.notifiable:
+      self.notifyable(qmf_object=data)
+    else:
+      self.queryResults.append(data)
+
+
   def addV2QueryResult(self, data):
     self.rawQueryResults.append(data)
 
@@ -2528,10 +2628,26 @@ class RequestContext(object):
   def signal(self):
     try:
       self.cv.acquire()
-      self.blocked = None
-      self.cv.notify()
+      if self.waitingForSchema:
+        self.pendingSignal = True
+        return
+      else:
+        self.blocked = None
+        self.cv.notify()
     finally:
       self.cv.release()
+    self._complete()
+
+
+  def _complete(self):
+    if self.notifiable:
+      if self.exception:
+        self.notifiable(qmf_exception=self.exception)
+      else:
+        self.notifiable(qmf_complete=True)
+
+    if self.sequence:
+      self.agent._clearContext(self.sequence)
 
 
   def processV2Data(self):
@@ -2568,6 +2684,19 @@ class RequestContext(object):
       else:
         self.queryResults.append(result)
 
+    complete = None
+    try:
+      self.cv.acquire()
+      if not self.waitingForSchema and self.pendingSignal:
+        self.blocked = None
+        self.cv.notify()
+        complete = True
+    finally:
+      self.cv.release()
+
+    if complete:
+      self._complete()
+
 
   def reprocess(self):
     """



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to