Hello, I have ganglia python script, i would like to see output of each code block.
Could you please guide. I don't have any idea about python. My scripts as follows: #!/usr/bin/env python ######################################## # # ganglia-api.py - Ganglia Metric API # ######################################## from settings import LOGFILE, PIDFILE, API_SERVER, HEARTBEAT import os import sys import glob import re import logging import socket import select import datetime import time import SocketServer from xml.etree import ElementTree from xml.parsers.expat import ExpatError from urllib import quote from threading import Thread import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web from tornado.options import define, options __version__ = '1.0.16' define("port", default=8080, help="run on the given port", type=int) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s[%(process)d] %(levelname)s - %(message)s", filename=LOGFILE) global logger logger = logging.getLogger("ganglia-api") class Elem: def __init__(self, elem): self.elem = elem def __getattr__(self, name): return self.elem.get(name.upper()) class NullElem: def __getattr__(self, name): return None class ApiMetric: tag_re = re.compile("\s+") def id(self): group = self.group if self.group is not None else "" id_elements = [self.environment, self.grid.name, self.cluster.name, self.host.name, group, self.name] return str.lower(".".join(filter(lambda (e): e is not None, id_elements))) def api_dict(self): type, units = ApiMetric.metric_type(self.type, self.units, self.slope) metric = {'environment': self.environment, 'service': self.grid.name, 'cluster': self.cluster.name, 'host': self.host.name, 'id': self.id(), 'metric': self.name, 'instance': self.instance, 'group': self.group, 'title': self.title, 'tags': ApiMetric.parse_tags(self.host.tags), 'description': self.desc, 'sum': self.sum, 'num': self.num, 'value': ApiMetric.is_num(self.val), 'units': units, 'type': type, 'sampleTime': datetime.datetime.fromtimestamp( int(self.host.reported) + int(self.host.tn) - int(self.tn)).isoformat() + ".000Z", 'graphUrl': self.graph_url, 'dataUrl': self.data_url} return dict(filter(lambda (k, v): v is not None, metric.items())) @staticmethod def parse_tags(tag_string): if tag_string is None: return None else: tags = ApiMetric.tag_re.split(tag_string) if "unspecified" in tags: return list() else: return tags @staticmethod def metric_type(type, units, slope): if units == 'timestamp': return 'timestamp', 's' if 'int' in type or type == 'float' or type == 'double': return 'gauge', units if type == 'string': return 'text', units return 'undefined', units @staticmethod def is_num(val): try: return int(val) except ValueError: pass try: return float(val) except ValueError: return val def __str__(self): return "%s %s %s %s %s %s" % ( self.environment, self.grid.name, self.cluster.name, self.host.name, self.group, self.name) class Metric(Elem, ApiMetric): def __init__(self, elem, host, cluster, grid, environment): self.host = host self.cluster = cluster self.grid = grid self.environment = environment Elem.__init__(self, elem) self.metadata = dict() for extra_data in elem.findall("EXTRA_DATA"): for extra_elem in extra_data.findall("EXTRA_ELEMENT"): name = extra_elem.get("NAME") if name: self.metadata[name] = extra_elem.get('VAL') original_metric_name = self.name try: self.metadata['NAME'], self.metadata['INSTANCE'] = self.name.split('-', 1) except ValueError: self.metadata['INSTANCE'] = '' if self.name in ['fs_util', 'inode_util']: if self.instance == 'rootfs': self.metadata['INSTANCE'] = '/' else: self.metadata['INSTANCE'] = '/' + '/'.join(self.instance.split('-')) params = {"environment": self.environment, "service": self.grid.name, "cluster": self.cluster.name, "host": self.host.name, "metric": original_metric_name} url = '%s/ganglia/api/v1/metrics?' % API_SERVER for (k, v) in params.items(): if v is not None: url += "&%s=%s" % (k, quote(v)) self.data_url = url params = {"c": self.cluster.name, "h": self.host.name, "v": "0", "m": original_metric_name, "r": "1day", "z": "default", "vl": self.units.replace('%', 'percent'), "ti": self.title} url = '%sgraph.php?' % self.grid.authority for (k, v) in params.items(): if v is not None: url += "&%s=%s" % (k, quote(v)) self.graph_url = url def __getattr__(self, name): try: if self.metadata.has_key(name.upper()): return self.metadata[name.upper()] else: return Elem.__getattr__(self, name) except AttributeError: return None def html_dir(self): return 'ganglia-' + self.environment + '-' + self.grid.name # Artificial metric generated from the Host class HeartbeatMetric(ApiMetric): def __init__(self, host, cluster, grid, environment): self.host = host self.cluster = cluster self.grid = grid self.environment = environment self.val = int(host.tn) self.tn = 0 self.tags = host.tags self.name = "heartbeat" self.group = "ganglia" self.title = "Ganglia Agent Heartbeat" self.desc = "Ganglia agent heartbeat in seconds" self.type = 'uint16' self.units = 'seconds' self.slope = 'both' def __getattr__(self, name): return None class GangliaGmetad: hostname = "localhost" def __init__(self, environment, service, xml_port, interactive_port): self.environment = environment self.service = service self.xml_port = xml_port self.interactive_port = interactive_port def read_data_from_port(self, host, port, send=None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect((host, port)) r, w, x = select.select([sock], [], [], 2) if not r: sock.close() return except socket.error, e: logger.warning('Could not open socket to %s:%d - %s', host, port, e) return try: if send is not None: sock.send(send) except socket.error, e: logger.warning('Could not send to %s:%d - %s', host, port, e) return buffer = "" while True: try: data = sock.recv(8192) except socket.error, e: logger.warning('Could not receive data from %s:%d - %s', host, port, e) return if not data: break buffer += data.decode("ISO-8859-1") sock.close() return buffer def read_xml_data(self): return self.read_data_from_port(self.hostname, self.xml_port) def read_xml_metrics(self): result = list() xml_data = self.read_xml_data() if xml_data: try: ganglia = ElementTree.XML(xml_data) except UnicodeEncodeError: logging.error('Could not parse XML data') return result else: return result for grid_elem in ganglia.findall("GRID"): grid = Elem(grid_elem) for cluster_elem in grid_elem.findall("CLUSTER"): cluster = Elem(cluster_elem) for host_elem in cluster_elem.findall("HOST"): host = Elem(host_elem) result.append(HeartbeatMetric(host, cluster, grid, self.environment)) for metric_elem in host_elem.findall("METRIC"): result.append(Metric(metric_elem, host, cluster, grid, self.environment)) return result def read_interactive_data(self): return self.read_data_from_port(self.hostname, self.interactive_port, '/?filter=summary\r\n') def read_interactive_metrics(self): result = list() interactive_data = self.read_interactive_data() if interactive_data: try: ganglia = ElementTree.XML(interactive_data) except ExpatError, e: logging.error('Could not parse XML data: %s', e) return result else: return result for grid_elem in ganglia.findall("GRID"): grid = Elem(grid_elem) for cluster_elem in grid_elem.findall("CLUSTER"): cluster = Elem(cluster_elem) for metric_elem in cluster_elem.findall("METRICS"): result.append(Metric(metric_elem, NullElem(), cluster, grid, self.environment)) return result def read_metrics(self): xml_metrics = self.read_xml_metrics() interactive_metrics = self.read_interactive_metrics() xml_metrics.extend(interactive_metrics) return xml_metrics class GangliaConfig: GANGLIA_PATH = '/etc/ganglia' def __init__(self): self.env_service_dict = self.parse_ganglia_config() def parse_ganglia_config(self): logger.info("Parsing ganglia configurations") result = dict() for file in glob.glob(os.path.join(GangliaConfig.GANGLIA_PATH, 'gmetad-*-*.conf')): m = re.search('gmetad-(\S+)-(\S+).conf', file) environment = m.group(1) service = m.group(2) xml_port = 0 interactive_port = 0 f = open(file).readlines() for line in f: m = re.search('xml_port\s+(\d+)', line) if m: xml_port = int(m.group(1)) m = re.search('interactive_port\s+(\d+)', line) if m: interactive_port = int(m.group(1)) ports = GangliaGmetad(environment, service, xml_port, interactive_port) result[(environment, service)] = ports logger.info('Found gmetad-%s-%s.conf with ports %d and %d', environment, service, ports.xml_port, ports.interactive_port) return result def get_gmetad_config(self): return self.env_service_dict.values() def get_gmetad_for(self, environment, service): def is_match(gmetad): env_match = (not environment) or (gmetad.environment in environment) service_match = (not service) or (gmetad.service in service) return env_match and service_match return filter(is_match, self.env_service_dict.values()) class GmetadData(): def __init__(self): self.data = dict() def update(self, gmetad): logger.info(" getting metrics for %s %s", gmetad.environment, gmetad.service) gmetad_metrics = gmetad.read_metrics() logger.info(" updated %d metrics for %s %s", len(gmetad_metrics), gmetad.environment, gmetad.service) self.data[(gmetad.environment, gmetad.service)] = gmetad_metrics return len(gmetad_metrics) def metrics_for(self, environment, service): try: return self.data[(environment, service)] except KeyError: return list() def metrics(self, gmetad): return self.metrics_for(gmetad.environment, gmetad.service) class GangliaPollThread(Thread): def run(self): while True: self.update_ganglia_data() def update_ganglia_data(self): gmetad_list = ganglia_config.get_gmetad_config() logger.info("Updating data from gmetad...") total_metrics = 0 for counter, gmetad in enumerate(gmetad_list): metrics_for_gmetad = ganglia_data.update(gmetad) total_metrics += metrics_for_gmetad logger.debug(" (%d/%d) updated %d metrics for %s %s", counter + 1, len(gmetad_list), metrics_for_gmetad, gmetad.environment, gmetad.service) time.sleep(0.2) logger.info("Done (found %d metrics)", total_metrics) if HEARTBEAT: os.system('/opt/alerta/sbin/alert-sender.py --heartbeat --origin ganglia-api --tag %s --quiet' % __version__) class ApiHandler(tornado.web.RequestHandler): def get(self): start = time.time() environment = self.get_arguments("environment") service = self.get_arguments("service") metric_list = self.get_arguments("metric") group_list = self.get_arguments("group") host_list = self.get_arguments("host") cluster_list = self.get_arguments("cluster") def emptyOrContains(list, value): return len(list) == 0 or value in list def is_match(metric): return (emptyOrContains(metric_list, metric.name) and emptyOrContains(group_list, metric.group) and emptyOrContains(host_list, metric.host.name) and emptyOrContains(cluster_list, metric.cluster.name)) gmetad_list = ganglia_config.get_gmetad_for(environment, service) metric_dicts = list() for gmetad in gmetad_list: metrics = ganglia_data.metrics(gmetad) for metric in filter(is_match, metrics): metric_dicts.append(metric.api_dict()) output_dict = { "metrics": metric_dicts, "status": "ok", "total": len(metric_dicts), "localTime": datetime.datetime.now().isoformat(), "time": "%.3f" % (time.time() - start) } self.write({"response": output_dict}) def main(): logger.info("Starting up Ganglia metric API v%s", __version__) # Write pid file if not already running if os.path.isfile(PIDFILE): pid = open(PIDFILE).read() try: os.kill(int(pid), 0) logging.error('Process with pid %s already exists, exiting', pid) sys.exit(1) except OSError: pass file(PIDFILE, 'w').write(str(os.getpid())) global ganglia_config ganglia_config = GangliaConfig() global ganglia_data ganglia_data = GmetadData() poll_thread = GangliaPollThread() poll_thread.daemon = True poll_thread.start() tornado.options.parse_command_line() application = tornado.web.Application([ (r"/ganglia/api/v1/metrics", ApiHandler), ]) http_server = tornado.httpserver.HTTPServer(application) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": main() -- https://mail.python.org/mailman/listinfo/python-list