Ottomata has uploaded a new change for review. https://gerrit.wikimedia.org/r/101431
Change subject: Using custom ganglia module instead of Logster. ...................................................................... Using custom ganglia module instead of Logster. I never got logster to work with positive counter values properly, so rather than continue hammering on that, here is an attempt at a custom ganglia module instead of a nice generic abstraction :/ Change-Id: I8b5b427afd04b14621c38cae6d5b3a3627e41612 --- D files/VarnishkafkaLogster.py A files/varnishkafka_ganglia.py M manifests/monitoring.pp A templates/varnishkafka_ganglia.pyconf.erb 4 files changed, 575 insertions(+), 130 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/puppet/varnishkafka refs/changes/31/101431/1 diff --git a/files/VarnishkafkaLogster.py b/files/VarnishkafkaLogster.py deleted file mode 100644 index d0c79bf..0000000 --- a/files/VarnishkafkaLogster.py +++ /dev/null @@ -1,100 +0,0 @@ -### VarnishkafkaLogster is a subclass of JsonLogster. -### It is meant to parse varnishkafka -### (https://github.com/wikimedia/operations-software-varnish-varnishkafka) -### JSON statistics. -### -### Example: -### sudo ./logster --dry-run --output=ganglia VarnishkafkaLogster /var/cache/varnishkafka.stats.json -### - -from logster.parsers.JsonLogster import JsonLogster -from logster.logster_helper import MetricObject - -class VarnishkafkaLogster(JsonLogster): - # Default ganglia slope is 'both', aka GAUGE. - # These keys should be given a slope of - # 'positive' aka COUNTER. - counter_metrics = [ - 'tx', - 'txbytes', - 'txerrs', - 'txmsgs', - - 'rx', - 'rxbytes' - 'rxerrs', - - 'kafka_drerr', - 'scratch_toosmall', - 'txerr', - 'trunc', - 'scratch_tmpbufs', - ] - - # metric keys to skip. - skip_metrics = [ - 'app_offset', - 'commited offset', - 'desired', - 'eof_offset', - 'fetch_state', - 'fetchq_cnt', - 'fetchq_cnt', - 'leader', - 'lp_curr' - 'name', - 'next_offset', - 'nodeid', - 'partition', - 'query_offset', - 'seq', - 'time', - 'topic', - 'toppars', - 'ts', - 'unknown', - ] - - def get_metric_object(self, metric_name, metric_value): - ''' - Overrides JsonLogster's get_metric_object() method to - manually set slope to positive for counter metrics. - ''' - - metric_object = JsonLogster.get_metric_object(self, metric_name, metric_value) - - metric_slope = '' - - if metric_name.split(self.key_separator)[-1] in VarnishkafkaLogster.counter_metrics: - metric_slope = 'positive' - - metric_object.slope = metric_slope - return metric_object - - def key_filter(self, key): - ''' - Overrides JsonLogster's key_filter method to - filter out irrelevant metrics, and to transform - the keys of some to make them more readable. - ''' - - if key in VarnishkafkaLogster.skip_metrics: - return False - # prepend appropriate rdkafka or varnishkafka to the key, - # depdending on where the metric has come from. - elif key == 'varnishkafka': - key = 'kafka{0}varnishkafka'.format(self.key_separator) - elif key == 'kafka': - key = 'kafka{0}rdkafka'.format(self.key_separator) - # don't send any bootstrap rdkafka metrics - elif 'bootstrap' in key: - return False - # replace any key separators in the key with '-' - elif self.key_separator in key: - # this won't do anything if key_separator is '-' - key = key.replace(self.key_separator, '-') - # don't send anything that starts with - - elif key.startswith('-'): - return False - - return key diff --git a/files/varnishkafka_ganglia.py b/files/varnishkafka_ganglia.py new file mode 100644 index 0000000..8e4af9e --- /dev/null +++ b/files/varnishkafka_ganglia.py @@ -0,0 +1,554 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" + Gmond module for posting metrics collected by a logster parser. + + :copyright: (c) 2013 Wikimedia Foundation + :author: Andrew Otto <o...@wikimedia.org> + :license: GPLv2+ +""" +# for true division when calculating rates of change +from __future__ import division + +import optparse +import time +import json +import logging +import sys +import os +import copy + +logger = logging.getLogger('varnishkafka_ganglia') +logger.setLevel(logging.INFO) + +# metric keys to skip reporting to ganglia +skip_metrics = [ + 'app_offset', + 'commited offset', + 'desired', + 'eof_offset', + 'fetch_state', + 'fetchq_cnt', + 'fetchq_cnt', + 'leader', + 'lp_curr' + 'name', + 'next_offset', + 'nodeid', + 'partition', + 'query_offset', + 'seq', + 'time', + 'topic', + 'toppars', + 'ts', + 'unknown', +] + + +def flatten_object(node, separator='.', key_filter_callback=None, parent_keys=[]): + ''' + Recurses through dicts and/or lists and flattens them + into a single level dict of key: value pairs. Each + key consists of all of the recursed keys joined by + separator. If key_filter_callback is callable, + it will be called with each key. It should return + either a new key which will be used in the final full + key string, or False, which will indicate that this + key and its value should be skipped. + ''' + flattened = {} + + try: + iterator = node.iteritems() + except AttributeError: + iterator = enumerate(node) + + for key, child in iterator: + # If key_filter_callback was provided, + # then call it on the key. If the returned + # key is false, then, we know to skip it. + if callable(key_filter_callback): + key = key_filter_callback(key) + if key is False: + continue + + # append this key to the end of all keys seen so far + all_keys = parent_keys + [str(key)] + + if hasattr(child, '__iter__'): + # merge the child items all together + flattened.update(flatten_object(child, separator, key_filter_callback, all_keys)) + else: + # '/' is not allowed in key names. + # Ganglia writes files based on key names + # and doesn't escape these in the path. + final_key = separator.join(all_keys).replace('/', separator) + flattened[final_key] = child + + return flattened + + + +def tail(filename, n=2): + ''' + Tails the last n lines from filename and returns them in a list. + ''' + + cmd = '/usr/bin/tail -n {0} {1}'.format(n, filename) + + f = os.popen(cmd) + lines = f.read().strip().split('\n') + f.close() + return lines + + +def infer_metric_type(value): + ''' + Infers ganglia type from the + variable type of value. + ''' + if isinstance(value, float): + metric_type = 'float' + # use unint for int and long. + # If bool, use 'string'. (bool is a subtype of int) + elif isinstance(value, int) or isinstance(value, long): + metric_type = 'unit' + else: + metric_type = 'string' + + return metric_type + + +class VarnishkafkaStats(object): + ''' + Class representing most recent varnishkafka stats found + in the varnishkafka.stats.json file. Calling update_stats() + will cause this class to read the most recent JSON objects from + this file, and parse them into a flattened stats dict suitable + for easy querying to send to ganglia. + ''' + + # Any key that ends with one of these + # will cause this update_stats() to + # calculate a rate of change for thie stat + # since the last run and insert it into + # flattened_stats dict suffixed by + # per_second_key_suffix. + counter_stats = [ + 'tx', + 'txbytes', + 'txerrs', + 'txmsgs', + + 'rx', + 'rxbytes' + 'rxerrs', + + 'kafka_drerr', + 'scratch_toosmall', + 'txerr', + 'trunc', + 'scratch_tmpbufs', + ] + + per_second_key_suffix = 'per_second' + + def __init__(self, stats_file='/var/cache/varnishkafka/varnishkafka.stats.json', key_separator='.'): + self.stats_file = stats_file + self.key_separator = key_separator + + self.flattened_stats = {} + self.flattened_stats_previous = {} + + # varnishkafka outputs two types of distinct JSON objects + # each time it outputs stats. One is for varnishkafka stats, + # the other is for librdkafka stats. We will read this + # many lines from the end of the stats file per gmond check. + self.distinct_lines_per_interval = 2 + + # timestamp keys from each distinct json object as + # they will appear in the self.flattened_stats dict. + # These are used for detecting changes in json data + # in the stats_file. + self.timestamp_keys = [ + 'kafka{0}varnishkafka{0}time'.format(self.key_separator), + 'kafka{0}rdkafka{0}time'.format(self.key_separator) + ] + + + def key_filter(self, key): + ''' + Filters out irrelevant varnishkafka metrics, and to transform + the keys of some to make them more readable. + ''' + + # prepend appropriate rdkafka or varnishkafka to the key, + # depending on where the metric has come from. + if key == 'varnishkafka': + key = 'kafka{0}varnishkafka'.format(self.key_separator) + elif key == 'kafka': + key = 'kafka{0}rdkafka'.format(self.key_separator) + # don't send any bootstrap rdkafka metrics + elif 'bootstrap' in key: + return False + # replace any key separators in the key with '-' + elif self.key_separator in key: + # this won't do anything if key_separator is '-' + key = key.replace(self.key_separator, '-') + # don't send anything that starts with - + elif key.startswith('-'): + return False + + return key + + + def is_counter_stat(self, key): + ''' + Returns true if this (flattened or leaf) key is a counter stat, + meaning it should always increase during a varnishkafka instance's + lifetime. + ''' + return (key.split(self.key_separator)[-1] in self.counter_stats) + + def tail_stats_file(self): + ''' + Returns the latest distinct_lines_per_interval lines from stats_file as a list. + ''' + logger.info('Reading latest varnishkafka stats from {0}'.format(self.stats_file)) + return tail(self.stats_file, self.distinct_lines_per_interval) + + def get_latest_stats_from_file(self): + ''' + Reads the latest stats out of stats_file and returns the parsed JSON object. + ''' + lines = self.tail_stats_file() + stats = {} + for line in lines: + stats.update(flatten_object(json.loads(line), self.key_separator, self.key_filter)) + + return stats + + def update_stats(self, stats=None): + ''' + Reads the latest stats out of stats_file and updates the stats + attribute of this class. If the data has changed since the last + update, new counter rate of change per seconds stats will also be + calculated. + ''' + # Save the current stats into the previous stats + # objects so we can compute rate of change over durations + # for counter metrics. + self.flattened_stats_previous = copy.deepcopy(self.flattened_stats) + + # If stats weren't manually passed, in, then go ahead and read the + # most recent stats out of the stats_file. + if not stats: + stats = self.get_latest_stats_from_file() + self.flattened_stats.update(stats) + + # If the stats we have now have actually changed since the + # last time we updated the stas, then go ahead and compute + # new per_second change rates for each counter stat. + if self.have_stats_changed_since_last_update(): + logger.debug('varnishkafka stats have changed since last update.') + self.update_counter_rate_stats() + else: + logger.debug('varnishkafka stats have not changed since last update.') + + def stat_rate_of_change(self, key): + ''' + Given a value for stat key name, computes (current stat - previous stat) / update interval. + pdate interval is computed from the timestamp that comes with the json stats, not + the time since update_stats() was last called. + ''' + key_list = key.split(self.key_separator) + timestamp_key = '{1}{0}{2}{0}time'.format(self.key_separator, key_list[0], key_list[1]) + + # if we don't yet have a previous value from which to calculate a + # rate, just return 0 for now + if not self.flattened_stats or not self.flattened_stats_previous or timestamp_key not in self.flattened_stats_previous: + return 0.0 + + interval = self.flattened_stats[timestamp_key] - self.flattened_stats_previous[timestamp_key] + # if the timestamps are the same, then just return 0. + if interval == 0: + return 0.0 + + # else calculate the per second rate of change + return (self.flattened_stats[key] - self.flattened_stats_previous[key]) / interval + + def update_counter_rate_stats(self): + ''' + For each counter stat, this will add an extra stat + to the stats for rate of change per second. + ''' + for key, value in self.flattened_stats.items(): + if self.is_counter_stat(key): + per_second_key = '{0}{1}{2}'.format(key, self.key_separator, self.per_second_key_suffix) + rate = self.stat_rate_of_change(key) + self.flattened_stats[per_second_key] = rate + + def have_stats_changed_since_last_update(self): + ''' + Returns true if either of the timestamp keys in the stats objects + differ from the timestamp keys in the previously collected stats object. + ''' + + if not self.flattened_stats or not self.flattened_stats_previous: + return True + + for key in self.timestamp_keys: + if self.flattened_stats[key] != self.flattened_stats_previous[key]: + return True + + return False +# +# Gmond Interface +# + + +# global VarnishkafkaStats object, will be +# instantiated by metric_init() +varnishkafka_stats = None +time_max = 15 +last_run_timestamp = 0 + +def metric_handler(name): + """Get value of particular metric; part of Gmond interface""" + global varnishkafka_stats + global time_max + global last_run_timestamp + + seconds_since_last_run = time.time() - last_run_timestamp + if (seconds_since_last_run >= time_max): + logger.debug('Updating varnishkafka_stats since it has been {0} seconds, which is more than tmax of {1}'.format(seconds_since_last_run, time_max)) + varnishkafka_stats.update_stats() + last_run_timestamp = time.time() + + logger.debug('metric_handler called for {0}'.format(name)) + return varnishkafka_stats.flattened_stats[name] + + +def metric_init(params): + """Initialize; part of Gmond interface""" + global varnishkafka_stats + global time_max + global last_run_timestamp + + stats_file = params.get('stats_file', '/var/cache/varnishkafka/varnishkafka.stats.json') + key_separator = params.get('key_separator', '.') + ganglia_groups = params.get('groups', 'kafka') + time_max = int(params.get('tmax', time_max)) + + varnishkafka_stats = VarnishkafkaStats(stats_file, key_separator) + # Run update_stats() so that we'll have a list of stats keys that will + # be sent to ganglia. We can use this to build the descriptions dicts + # that metric_init is supposed to return. + # NOTE: This requires that the stats file already have some data in it. + varnishkafka_stats.update_stats() + last_run_timestamp = time.time() + + descriptions = [] + # run the initial get_metric_objects(): + # to run logster.parse and read in all of the + # MetricObjects from the logfile. + + # iterate through the initial set of stats and create + # dictionary objects for each. + for key, value in varnishkafka_stats.flattened_stats.items(): + # skip any keys that are in skip_metrics + if key.split(key_separator)[-1] in skip_metrics: + continue + + # value_type must be one of + # string | uint | float | double. + # If 'int' is in the metric.type + # from the logster parser, then use + # 'uint' + metric_type = infer_metric_type(value) + # if value is a bool, then convert it to an int + if isinstance(value, bool): + value = int(value) + + if metric_type == 'uint': + metric_format = '%d' + elif metric_type == 'float' or metric_type == 'double': + metric_format = '%f' + else: + metric_format = '%s' + + # try to infer some useful units from the key name. + if 'bytes' in key: + metric_units = 'bytes' + elif 'tx' in key: + metric_units = 'transmits' + elif 'rx' in key: + metric_units = 'receives' + elif 'rtt' in key: + metric_units = 'microseconds' + else: + metric_units = '' + if key.endswith(varnishkafka_stats.per_second_key_suffix): + metric_units = ' '.join([metric_units, 'per second']) + + descriptions.append({ + 'name': key, + 'call_back': metric_handler, + 'time_max': time_max, + 'value_type': metric_type, + 'units': metric_units, + 'slope': 'both', + 'format': metric_format, + 'description': '', + 'groups': ganglia_groups, + }) + + return descriptions + +def metric_cleanup(): + """Teardown; part of Gmond interface""" + pass + + + + + + + +# To run tests: +# python -m unittest varnishkafka_ganglia +import unittest +class TestVarnishkafkaGanglia(unittest.TestCase): + def setUp(self): + self.key_separator = '&' + self.varnishkafka_stats = VarnishkafkaStats('/tmp/test-varnishkafka.stats.json', self.key_separator) + + # self.json_line = '{ "1.1": { "value1": 0, "value2": "hi", "1.2": { "value3": 0.1, "value4": false } }, "2.1": ["a","b"] }' + self.json_data = { + '1.1': { + 'value1': 0, + 'value2': 'hi', + '1.2': { + 'value3': 0.1, + 'value4': False, + } + }, + '2.1': ['a','b'], + '2.1': ['a','b'], + # '/' should be replaced with key_separator + '3/1': 'nonya', + 'notme': 'nope', + 'kafka': { + 'varnishkafka': { + 'time': time.time(), + 'counter': { self.varnishkafka_stats.counter_stats[0]: 0 }, + }, + 'rdkafka': { 'time': time.time() } + }, + } + self.flattened_should_be = { + '1.1&value1': 0, + '1.1&valuetwo': 'hi', + '1.1&1.2&value3': 0.1, + '1.1&1.2&value4': False, + '2.1&0': 'a', + '2.1&1': 'b', + # '/' should be replaced with key_separator + '3&1': 'nonya', + 'kafka&varnishkafka&time': self.json_data['kafka']['varnishkafka']['time'], + 'kafka&varnishkafka&counter&{0}'.format(self.varnishkafka_stats.counter_stats[0]): 0, + 'kafka&rdkafka&time': self.json_data['kafka']['rdkafka']['time'], + } + + + def key_filter_callback(self, key): + if key == 'value2': + key = 'valuetwo' + if key == 'notme': + key = False + + return key + + def test_flatten_object(self): + flattened = flatten_object(self.json_data, self.key_separator, self.key_filter_callback) + self.assertEquals(flattened, self.flattened_should_be) + + def test_is_counter_stat(self): + self.assertTrue(self.varnishkafka_stats.is_counter_stat(self.varnishkafka_stats.counter_stats[0])) + self.assertTrue(self.varnishkafka_stats.is_counter_stat('whatever&it&no&matter&' + self.varnishkafka_stats.counter_stats[0])) + self.assertFalse(self.varnishkafka_stats.is_counter_stat('notone')) + + def test_update_stats(self): + self.varnishkafka_stats.update_stats(self.flattened_should_be) + self.assertEquals(self.varnishkafka_stats.flattened_stats['1.1&valuetwo'], self.flattened_should_be['1.1&valuetwo']) + + previous_value = self.varnishkafka_stats.flattened_stats['1.1&valuetwo'] + self.flattened_should_be['1.1&valuetwo'] = 1 + self.varnishkafka_stats.update_stats(self.flattened_should_be) + self.assertEquals(self.varnishkafka_stats.flattened_stats['1.1&valuetwo'], self.flattened_should_be['1.1&valuetwo']) + self.assertEquals(self.varnishkafka_stats.flattened_stats_previous['1.1&valuetwo'], previous_value) + + def test_rate_of_change_update_stats(self): + counter_key = 'kafka{0}varnishkafka{0}counter{0}{1}'.format(self.key_separator, self.varnishkafka_stats.counter_stats[0]) + self.varnishkafka_stats.update_stats(self.flattened_should_be) + previous_value = self.flattened_should_be[counter_key] + + # increment the counter and the timestamp to make VarnishkafkaStats calculate a new per_second rate + self.flattened_should_be[counter_key] += 101 + self.flattened_should_be['kafka&varnishkafka&time'] += 100.0 + self.varnishkafka_stats.update_stats(self.flattened_should_be) + + self.assertEquals(self.varnishkafka_stats.flattened_stats_previous[counter_key], previous_value) + self.assertEquals(self.varnishkafka_stats.flattened_stats[counter_key], self.flattened_should_be[counter_key]) + self.assertEquals(self.varnishkafka_stats.flattened_stats['kafka&varnishkafka&time'], self.flattened_should_be['kafka&varnishkafka&time']) + per_second_key = '{0}{1}{2}'.format(counter_key, self.key_separator, self.varnishkafka_stats.per_second_key_suffix) + rate_should_be = (self.flattened_should_be[counter_key] - self.varnishkafka_stats.flattened_stats_previous[counter_key]) / 100.0 + self.assertEquals(self.varnishkafka_stats.flattened_stats[per_second_key], rate_should_be) + + + + + + +if __name__ == '__main__': + # When invoked as standalone script, run a self-test by querying each + # metric descriptor and printing it out. + ch = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s') + ch.setFormatter(formatter) + logger.addHandler(ch) + + cmdline = optparse.OptionParser(usage="usage: %prog [options] statsfile") + cmdline.add_option('--tmax', '-t', action='store', default=15, + help='time_max for ganglia python module metrics.') + cmdline.add_option('--key-separator', '-k', dest='key_separator', default='.', + help='Key separator for flattened json object key name. Default: \'.\' \'/\' is not allowed.') + cmdline.add_option('--debug', '-D', action='store_true', default=False, + help='Provide more verbose logging for debugging.') + + cli_options, arguments = cmdline.parse_args() + + if (len(arguments) != 1): + cmdline.print_help() + cmdline.error("Must supply statsfile argument.") + + if (cli_options.debug): + logger.setLevel(logging.DEBUG) + + cli_options.stats_file = arguments[0] + + # Turn the optparse.Value object into a regular dict + # so we can pass it to metric_init + params = vars(cli_options) + + # When invoked as standalone script, run a self-test by querying each + # metric descriptor and printing it out. + print('----------') + metric_descriptions = metric_init(params) + while True: + for metric in metric_descriptions: + value = metric['call_back'](metric['name']) + print("{0} => {1} {2}".format(metric['name'], value, metric['units'])) + time.sleep(float(params['tmax'])) + print('----------') diff --git a/manifests/monitoring.pp b/manifests/monitoring.pp index ba9c976..0e78eba 100644 --- a/manifests/monitoring.pp +++ b/manifests/monitoring.pp @@ -1,41 +1,20 @@ # == Class varnishkafka::monitoring -# Uses logster (https://github.com/wikimedia/operations-debs-logster) -# to tail varnishkafka.stats.json file send stats to Ganglia. +# Installs varnishkafka python ganglia module. # -# TODO: Support more than logster ganglia output via -# class parameters. class varnishkafka::monitoring( $ensure = 'present' ) { Class['varnishkafka'] -> Class['varnishkafka::monitoring'] - # varnishkafka monitoring is done via the logster package. - package { 'logster': - ensure => 'installed', - # don't bother doing this unless ganglia is installed - require => Package['ganglia-monitor'] + $log_statistics_file = $::varnishkafka::log_statistics_file + $log_statistics_interval = $::varnishkafka::log_statistics_interval + file { '/usr/lib/ganglia/python_modules/varnishkafka.py': + source => 'puppet:///modules/varnishkafka/varnishkafka_ganglia.py', + require => Package['ganglia-monitor-python'] } - # put the VarnishkafkaLogster.py module in place - if !defined(File['/usr/local/share/logster']) { - file { '/usr/local/share/logster': - ensure => 'directory', - } - } - - # Custom JsonLogster parser subclass to filter and transform - # a few varnishkafka stats JSON keys. - file { '/usr/local/share/logster/VarnishkafkaLogster.py': - source => 'puppet:///modules/varnishkafka/VarnishkafkaLogster.py', - require => File['/usr/local/share/logster'], - } - - # Run logster using the VarnishkafkaLogster parser and send updated stats to Ganglia. - $cron_command = "export PYTHONPATH=\$PYTHONPATH:/usr/local/share/logster && /usr/bin/logster --output ganglia --gmetric-options='--group=kafka --tmax=60' VarnishkafkaLogster.VarnishkafkaLogster ${varnishkafka::log_statistics_file}" - cron { 'varnishkafka-stats-to-ganglia': - ensure => $ensure, - command => $cron_command, - minute => '*/1', - require => [Package['logster'], File['/usr/local/share/logster/VarnishkafkaLogster.py']] + file { '/etc/ganglia/conf.d/varnishkafka.pyconf': + content => template('varnishkafka/varnishkafka_ganglia.pyconf.erb') + require => Package['ganglia-monitor-python'] } } diff --git a/templates/varnishkafka_ganglia.pyconf.erb b/templates/varnishkafka_ganglia.pyconf.erb new file mode 100644 index 0000000..ee624be --- /dev/null +++ b/templates/varnishkafka_ganglia.pyconf.erb @@ -0,0 +1,12 @@ +modules { + module { + name = "varnishkafka" + language = "python" + param stats_file { value = "<%= log_statistics_file %>" } + param tmax { value = <%= log_statistics_interval %> } + } +} +collection_group { + collect_every = <%= log_statistics_interval %> + time_threshold = <%= log_statistics_interval %> +} -- To view, visit https://gerrit.wikimedia.org/r/101431 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I8b5b427afd04b14621c38cae6d5b3a3627e41612 Gerrit-PatchSet: 1 Gerrit-Project: operations/puppet/varnishkafka Gerrit-Branch: master Gerrit-Owner: Ottomata <o...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits