Author: kgiusti Date: Tue May 18 20:34:34 2010 New Revision: 945871 URL: http://svn.apache.org/viewvc?rev=945871&view=rev Log: QMF: allow consoles to filter agent heartbeats based on agent identification.
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=945871&r1=945870&r2=945871&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue May 18 20:34:34 2010 @@ -50,6 +50,9 @@ namespace { bool disabled = false; ManagementAgent* agent = 0; int refCount = 0; + + const string defaultVendorName("vendor"); + const string defaultProductName("product"); } ManagementAgent::Singleton::Singleton(bool disableManagement) @@ -176,7 +179,7 @@ void ManagementAgentImpl::init(const qpi storeData(true); if (attrMap.empty()) - setName("vendor", "product"); + setName(defaultVendorName, defaultProductName); initialized = true; } @@ -361,11 +364,25 @@ void ManagementAgentImpl::retrieveData() void ManagementAgentImpl::sendHeartbeat() { static const string addr_exchange("qmf.default.topic"); - static const string addr_key("agent.ind.heartbeat"); + static const string addr_key_base("agent.ind.heartbeat"); Variant::Map map; Variant::Map headers; string content; + std::stringstream addr_key; + + addr_key << addr_key_base; + + // append .<vendor>.<product> to address key if present. + Variant::Map::const_iterator v; + if ((v = attrMap.find("_vendor")) != attrMap.end() && + v->second.getString() != defaultVendorName) { + addr_key << "." << v->second.getString(); + if ((v = attrMap.find("_product")) != attrMap.end() && + v->second.getString() != defaultProductName) { + addr_key << "." << v->second.getString(); + } + } headers["method"] = "indication"; headers["qmf.opcode"] = "_agent_heartbeat_indication"; @@ -377,7 +394,7 @@ void ManagementAgentImpl::sendHeartbeat( map["_values"].asMap()["epoch"] = bootSequence; MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key); + connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key.str()); QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=945871&r1=945870&r2=945871&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue May 18 20:34:34 2010 @@ -801,7 +801,18 @@ void ManagementAgent::periodicProcessing } if (qmf2Support) { - static const string addr_key("agent.ind.heartbeat"); + std::stringstream addr_key; + + addr_key << "agent.ind.heartbeat"; + + // append .<vendor>.<product> to address key if present. + Variant::Map::const_iterator v; + if ((v = attrMap.find("_vendor")) != attrMap.end()){ + addr_key << "." << v->second.getString(); + if ((v = attrMap.find("_product")) != attrMap.end()) { + addr_key << "." << v->second.getString(); + } + } Variant::Map map; Variant::Map headers; @@ -817,7 +828,7 @@ void ManagementAgent::periodicProcessing string content; MapCodec::encode(map, content); - sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key); + sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str()); QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=945871&r1=945870&r2=945871&view=diff ============================================================================== --- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original) +++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Tue May 18 20:34:34 2010 @@ -574,6 +574,7 @@ class Session: self.rcvHeartbeats = False self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys() self.manageConnections = manageConnections + self.agent_filter = [] # (vendor, product, instance) if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") @@ -802,15 +803,40 @@ class Session: """ """ pass + def addAgentFilter(self, vendor, product=None): + """ Listen for heartbeat messages only for those agent(s) that match the + vendor and, optionally, the product strings. + """ + key = "agent.ind.heartbeat." + vendor + if product is not None: + key += "." + product + key += ".#" + + if key not in self.v2BindingKeyList: + self.v2BindingKeyList.append(key) + self.agent_filter.append((vendor, product, None)) + + # be sure we don't ever filter the local broker + local_broker_key = "agent.ind.heartbeat.apache.org.qpidd" + if local_broker_key not in self.v2BindingKeyList: + self.v2BindingKeyList.append(local_broker_key) + + # remove the wildcard key if present + try: + self.v2BindingKeyList.remove("agent.ind.heartbeat.#") + except: + pass def _bindingKeys(self): + """ The set of default key bindings.""" v1KeyList = [] v2KeyList = [] v1KeyList.append("schema.#") v2KeyList.append("agent.ind.heartbeat.#") if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings: v1KeyList.append("console.#") - v2KeyList.append("agent.#") + v2KeyList.append("agent.ind.data.#") + v2KeyList.append("agent.ind.event.#") else: if self.rcvObjects and not self.userBindings: v1KeyList.append("console.obj.#") --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org