Hi, guys,
I am writing a ganglia python module to Flume metrics to ganglia. It's
pretty much done, running the python script on command line gives all the
metrics, however, when I enabled it in ganglia, only some of the metrics was
sent, "telnet localhost 8649" confirms that only some of the metrics are
known to ganglia. I have been trying to guess what the problem is, but with
no luck. Here is my code, please let me know any suggestion you might have:
flume_stats.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import threading
import time
import traceback
import urllib2
import json
descriptors = list()
JVM_Metrics = {
'mem.heap.init' : 'init heap size',
'mem.heap.max' : 'max heap size',
'mem.heap.used' : 'used heap size',
}
Node_Metrics = {
'sink.LazyOpenDecorator.BackoffFailover.failsPrimary' : 'sink
primary failed events',
'sink.LazyOpenDecorator.BackoffFailover.sentBackups' : 'sink
backup sent events',
'sink.LazyOpenDecorator.BackoffFailover.sentPrimary' : 'sink
primary sent events',
'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendRequests'
: 'requsted events',
'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendRetries'
: 'retried events',
'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendSuccessses'
: 'succeeded events',
'source.LazyOpenSource.ThriftEventSource.number of bytes' : 'source
thrift bytesIn',
'source.LazyOpenSource.ThriftEventSource.number of events' : 'source
thrift eventsIn',
'source.LazyOpenSource.TailSource.number of bytes' : 'source
tail bytesIn',
'source.LazyOpenSource.TailSource.number of events' : 'source
tail eventsIn',
}
Short_Metric_Name = {
'sink.LazyOpenDecorator.BackoffFailover.failsPrimary' :
'snk.pri.fails',
'sink.LazyOpenDecorator.BackoffFailover.sentBackups' :
'snk.bkup.sent',
'sink.LazyOpenDecorator.BackoffFailover.sentPrimary' :
'snk.pri.sent',
'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendRequests'
: 'snk.rqst.evt',
'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendRetries'
: 'snk.rtr.evt',
'sink.LazyOpenDecorator.Collector.AckChecksumChecker.InsistentAppend.appendSuccessses'
: 'snk.sus.evt',
'source.LazyOpenSource.ThriftEventSource.number of bytes' :
'src.thrift.bytes',
'source.LazyOpenSource.ThriftEventSource.number of events' :
'src.thrift.evt',
'source.LazyOpenSource.TailSource.number of bytes' :
'src.tail.bytes',
'source.LazyOpenSource.TailSource.number of events' :
'src.tail.evt',
}
_worker_thread = None
_lock = threading.Lock()
class UpdateFlumeMetricThread(threading.Thread):
'''update Flume metrics'''
def __init__(self, params):
threading.Thread.__init__(self)
self.running = False
self.exiting = False
self.url = params["url"]
self.refresh_rate = int(params["refresh_rate"])
self.metric = {}
def exit(self):
self.exiting = True
if not self.running:
return
self.join()
def run(self):
self.running = True
while not self.exiting:
_lock.acquire()
self.update_metric()
_lock.release()
time.sleep(self.refresh_rate)
self.running = False
def update_metric(self):
try:
req = urllib2.Request(url = self.url)
res = urllib2.urlopen(req)
j = json.loads(res.read())
for k in JVM_Metrics.keys():
self.metric['jvm.' + k] = j['jvmInfo'][k]
for l_nodes in j['logicalnodes'].keys():
req = urllib2.Request(url = self.url + "/" + l_nodes)
res = urllib2.urlopen(req)
j_node = json.loads(res.read())
for k_node in Node_Metrics.keys():
if k_node in j_node.keys():
self.metric['node.' + l_nodes[:3] + '.' +
Short_Metric_Name[k_node]] = j_node[k_node]
except urllib2.URLError:
traceback.print_exc()
else:
res.close()
def get_metric(self, name):
val = 0
if name in self.metric:
_lock.acquire()
val = self.metric[name]
_lock.release()
return val
def get_metric(name):
return _worker_thread.get_metric(name)
def metric_init(params):
global descriptors, _worker_thread
print '[flume_stats] Received the following parameters'
print params
if "metric_group" not in params:
params["metric_group"] = "flume"
if "refresh_rate" not in params:
params["refresh_rate"] = 30
if "url" not in params:
params["url"] = "http://localhost:35862/node/reports"
_worker_thread = UpdateFlumeMetricThread(params)
_worker_thread.start()
for (k,v) in JVM_Metrics.iteritems():
descriptors.append({
"name" : 'jvm.' + k,
"call_back" : get_metric,
"time_max" : 60,
"value_type" : "uint",
"units" : "bytes",
"format" : "%u",
"description" : v,
"groups" : params["metric_group"],
})
req = urllib2.Request(params["url"])
res = urllib2.urlopen(req)
j = json.loads(res.read())
for l_nodes in j['logicalnodes'].keys():
req = urllib2.Request(params["url"] + "/" + l_nodes)
res = urllib2.urlopen(req)
j_node = json.loads(res.read())
for k_node in Node_Metrics.keys():
if k_node in j_node:
descriptors.append({
"name" : 'node.' + l_nodes[:3] + '.' +
Short_Metric_Name[k_node],
"call_back" : get_metric,
"time_max" : 60,
"value_type" : "uint",
"units" : "bytes_events",
"format" : "%u",
"description" : Node_Metrics[k_node],
"groups" : params["metric_group"],
})
return descriptors
def metric_cleanup():
'''Clean up the metric module.'''
_worker_thread.exit()
#This code is for debugging and unit testing
if __name__ == '__main__':
try:
params = {
'url': 'http://localhost:35862/node/reports',
}
metric_init(params)
while True:
for d in descriptors:
v = d['call_back'](d['name'])
print 'value for %s is %u' % (d['name'], v)
time.sleep(30)
except KeyboardInterrupt:
time.sleep(0.2)
os._exit(1)
and flume_stats.pyconf:
modules {
module {
name = "flume_stats"
language = "python"
param url {
value = "http://localhost:35862/node/reports"
}
param refresh_rate {
value = 30
}
param metric_group {
value = "flume"
}
}
}
collection_group {
collect_every = 30
time_threshold = 60
metric {
name_match = "node\.([a-z\.]+)"
title = "Flume logicalnode \\1"
value_threshold = 0
}
metric {
name_match = "jvm\.([a-z\.]+)"
title = "Flume jvm \\1"
value_threshold = 0
}
}
the output of running flume_stats.py:
[flume_stats] Received the following parameters
{'url': 'http://localhost:35862/node/reports'}
value for jvm.mem.heap.init is 114117632
value for jvm.mem.heap.used is 362908560
value for jvm.mem.heap.max is 1623719936
value for node.apa.src.tail.bytes is 82739062
value for node.apa.src.tail.evt is 387330
value for node.apa.snk.bkup.sent is 1
value for node.apa.snk.pri.fails is 1
value for node.apa.snk.pri.sent is 387329
value for node.jan.src.thrift.evt is 554692327
value for node.jan.snk.bkup.sent is 0
value for node.jan.src.thrift.bytes is 634427896119
value for node.jan.snk.pri.fails is 0
value for node.jan.snk.pri.sent is 554692327
value for jvm.mem.heap.init is 114117632
value for jvm.mem.heap.used is 362908560
value for jvm.mem.heap.max is 1623719936
value for node.apa.src.tail.bytes is 82739062
value for node.apa.src.tail.evt is 387330
value for node.apa.snk.bkup.sent is 1
value for node.apa.snk.pri.fails is 1
value for node.apa.snk.pri.sent is 387329
value for node.jan.src.thrift.evt is 554692327
value for node.jan.snk.bkup.sent is 0
value for node.jan.src.thrift.bytes is 634427896119
value for node.jan.snk.pri.fails is 0
value for node.jan.snk.pri.sent is 554692327
...
Shuang
------------------------------------------------------------------------------
All the data continuously generated in your IT infrastructure contains a
definitive record of customers, application performance, security
threats, fraudulent activity and more. Splunk takes this data and makes
sense of it. Business sense. IT sense. Common sense.
http://p.sf.net/sfu/splunk-d2dcopy1
_______________________________________________
Ganglia-developers mailing list
Ganglia-developers@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/ganglia-developers