Dzahn has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/396088 )
Change subject: kafkatee: remove Ganglia monitoring class and script ...................................................................... kafkatee: remove Ganglia monitoring class and script In I897c847a0ac382d34e2d7 the inclusion of this class has been removed from the logging::kafkatee::webrequest::base class. Now the files can be deleted in the submodule. Ganglia is going away. Bug: T177225 Change-Id: If1ed1245c6e3c09b101e0b06035a80e2b4c38581 --- D files/kafkatee_ganglia.py D manifests/monitoring.pp 2 files changed, 0 insertions(+), 665 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/puppet/kafkatee refs/changes/88/396088/1 diff --git a/files/kafkatee_ganglia.py b/files/kafkatee_ganglia.py deleted file mode 100755 index acda0b8..0000000 --- a/files/kafkatee_ganglia.py +++ /dev/null @@ -1,629 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" - Gmond module for posting metrics from kafkatee. - - :copyright: (c) 2014 Wikimedia Foundation - :author: Andrew Otto <o...@wikimedia.org> - :license: Apache-2+ -""" -# for true division when calculating rates of change -from __future__ import division - -import copy -import json -import logging -import optparse -import os -import sys -import time - -logger = logging.getLogger('kafkatee') - - -# metric keys to skip reporting to ganglia -skip_metrics = [ - 'app_offset', - 'desired', - 'fetch_state', - 'fetchq_cnt', - 'leader', - 'name', - 'nodeid', - 'partition', - 'query_offset', - 'state', - 'time', - 'topic', - 'toppars', - 'ts', - 'unknown', - 'xmit_msgq_cnt', - 'xmit_msgq_bytes', - 'outbuf_cnt', - 'tx', - 'txbytes', - 'txmsgs', - 'txerrs', - 'txretries', -] - - -def flatten_object(node, separator='.', key_filter_callback=None, parent_keys=None): - ''' - Recurses through dicts and/or lists and flattens them - into a single level dict of key: value pairs. Each - key consists of all of the recursed keys joined by - separator. If key_filter_callback is callable, - it will be called with each key. It should return - either a new key which will be used in the final full - key string, or False, which will indicate that this - key and its value should be skipped. - ''' - if parent_keys is None: - parent_keys = [] - - flattened = {} - - try: - iterator = node.iteritems() - except AttributeError: - iterator = enumerate(node) - - for key, child in iterator: - # If key_filter_callback was provided, - # then call it on the key. If the returned - # key is false, then, we know to skip it. - if callable(key_filter_callback): - key = key_filter_callback(key) - if key is False: - continue - - # append this key to the end of all keys seen so far - all_keys = parent_keys + [str(key)] - - if hasattr(child, '__iter__'): - # merge the child items all together - flattened.update(flatten_object(child, separator, key_filter_callback, all_keys)) - else: - # '/' is not allowed in key names. - # Ganglia writes files based on key names - # and doesn't escape these in the path. - final_key = separator.join(all_keys).replace('/', separator) - flattened[final_key] = child - - return flattened - - -def tail(filename, n=2): - ''' - Tails the last n lines from filename and returns them in a list. - ''' - - cmd = '/usr/bin/tail -n {0} {1}'.format(n, filename) - - f = os.popen(cmd) - lines = f.read().strip().split('\n') - f.close() - return lines - - -def infer_metric_type(value): - ''' - Infers ganglia type from the - variable type of value. - ''' - if isinstance(value, float): - metric_type = 'float' - # use unint for int and long. - # If bool, use 'string'. (bool is a subtype of int) - elif isinstance(value, int) or isinstance(value, long): - metric_type = 'uint' - else: - metric_type = 'string' - - return metric_type - - -class KafkateeStats(object): - ''' - Class representing most recent kafkatee stats found - in the kafkatee.stats.json file. Calling update_stats() - will cause this class to read the most recent JSON objects from - this file, and parse them into a flattened stats dict suitable - for easy querying to send to ganglia. - ''' - - # Any key that ends with one of these - # will cause this update_stats() to - # calculate a rate of change for this stat - # since the last run and insert it into - # flattened_stats dict suffixed by - # per_second_key_suffix. - counter_stats = [ - 'rx', - 'rxbytes', - 'rxerrs', - - 'next_offset', - 'eof_offset', - 'committed_offset', - ] - - per_second_key_suffix = 'per_second' - - def __init__(self, stats_file='/var/cache/kafkatee/kafkatee.stats.json', key_separator='.'): - self.stats_file = stats_file - self.key_separator = key_separator - - # NOTE: It might be more elegant to - # store the JSON object as it comes back from stats_file, - # rather than keeping the state in the flattened hash. - - # latest flattnened stats as read from stats_file - self.flattened_stats = {} - # previous flattened stats as read from stats_file - self.flattened_stats_previous = {} - - # kafkatee outputs rdkafka stats, keyed by 'kafka' - self.distinct_lines_per_interval = 1 - - # timestamp keys from each distinct json object as - # they will appear in the self.flattened_stats dict. - # These are used for detecting changes in json data - # in the stats_file. - self.timestamp_keys = [ - key_separator.join(['kafka', 'rdkafka', 'time']), - ] - - def key_filter(self, key): - ''' - Filters out irrelevant kafkatee metrics, and to transform - the keys of some to make them more readable. - ''' - - # prepend appropriate rdkafka to the key, - # depending on where the metric has come from. - if key == 'kafka': - key = self.key_separator.join(['kafka', 'rdkafka']) - # don't send any bootstrap rdkafka metrics - elif 'bootstrap' in key: - return False - # replace any key separators in the key with '-' - elif self.key_separator in key: - # this won't do anything if key_separator is '-' - key = key.replace(self.key_separator, '-') - # don't send anything that starts with - - elif key.startswith('-'): - return False - - return key - - def is_counter_stat(self, key): - ''' - Returns true if this (flattened or leaf) key is a counter stat, - meaning it should always increase during a kafkatee instance's - lifetime. - ''' - return (key.split(self.key_separator)[-1] in self.counter_stats) - - def tail_stats_file(self): - ''' - Returns the latest distinct_lines_per_interval lines from stats_file as a list. - ''' - logger.info('Reading latest kafkatee stats from {0}'.format(self.stats_file)) - return tail(self.stats_file, self.distinct_lines_per_interval) - - def get_latest_stats_from_file(self): - ''' - Reads the latest stats out of stats_file and returns the parsed JSON object. - ''' - lines = self.tail_stats_file() - stats = {} - for line in lines: - stats.update(flatten_object(json.loads(line), self.key_separator, self.key_filter)) - - return stats - - def update_stats(self, stats=None): - ''' - Reads the latest stats out of stats_file and updates the stats - attribute of this class. If the data has changed since the last - update, new counter rate of change per seconds stats will also be - calculated. - ''' - # Save the current stats into the previous stats - # objects so we can compute rate of change over durations - # for counter metrics. - self.flattened_stats_previous = copy.deepcopy(self.flattened_stats) - - # If stats weren't manually passed, in, then go ahead and read the - # most recent stats out of the stats_file. - if not stats: - stats = self.get_latest_stats_from_file() - self.flattened_stats.update(stats) - - # If the stats we have now have actually changed since the - # last time we updated the stats, then go ahead and compute - # new per_second change rates for each counter stat. - if self.have_stats_changed_since_last_update(): - logger.debug('kafkatee stats have changed since last update.') - self.update_counter_rate_stats() - else: - logger.debug('kafkatee stats have not changed since last update.') - - def stat_rate_of_change(self, key): - ''' - Given a value for stat key name, computes - (current stat - previous stat) / update interval. - Update interval is computed from the timestamp that comes with the - JSON stats, not the time since update_stats() was last called. - ''' - # The timestamp will be keyed as 'kafka.rdkafka.time' - timestamp_key = self.key_separator.join(key.split(self.key_separator)[0:2] + ['time']) - - # if we don't yet have a previous value from which to calculate a - # rate, just return 0 for now - if (not self.flattened_stats or not self.flattened_stats_previous or - timestamp_key not in self.flattened_stats_previous): - return 0.0 - - interval = self.flattened_stats[timestamp_key] - \ - self.flattened_stats_previous[timestamp_key] - # if the timestamps are the same, then just return 0. - if interval == 0: - return 0.0 - - # else calculate the per second rate of change - return (self.flattened_stats[key] - self.flattened_stats_previous[key]) / interval - - def update_counter_rate_stats(self): - ''' - For each counter stat, this will add an extra stat - to the stats for rate of change per second. - ''' - for key in filter(self.is_counter_stat, self.flattened_stats.keys()): - per_second_key = self.key_separator.join([key, self.per_second_key_suffix]) - rate = self.stat_rate_of_change(key) - self.flattened_stats[per_second_key] = rate - - def have_stats_changed_since_last_update(self): - ''' - Returns true if either of the timestamp keys in the stats objects - differ from the timestamp keys in the previously collected stats object. - ''' - - if not self.flattened_stats or not self.flattened_stats_previous: - return True - - for key in self.timestamp_keys: - if self.flattened_stats[key] != self.flattened_stats_previous[key]: - return True - - return False -# -# Gmond Interface -# - - -# global kafkateeStats object, will be -# instantiated by metric_init() -kafkatee_stats = None -time_max = 15 -last_run_timestamp = 0 - - -def metric_handler(name): - """Get value of particular metric; part of Gmond interface""" - global kafkatee_stats - global time_max - global last_run_timestamp - - seconds_since_last_run = time.time() - last_run_timestamp - if (seconds_since_last_run >= time_max): - logger.debug( - ('Updating kafkatee_stats since it has been {0} seconds, ' - 'which is more than tmax of {1}').format(seconds_since_last_run, time_max)) - kafkatee_stats.update_stats() - last_run_timestamp = time.time() - - logger.debug('metric_handler called for {0}, value: {1}'.format( - name, kafkatee_stats.flattened_stats[name])) - return kafkatee_stats.flattened_stats[name] - - -def metric_init(params): - """Initialize; part of Gmond interface""" - global kafkatee_stats - global time_max - global last_run_timestamp - - stats_file = params.get('stats_file', '/var/cache/kafkatee/kafkatee.stats.json') - key_separator = params.get('key_separator', '.') - ganglia_groups = params.get('groups', 'kafka') - time_max = int(params.get('tmax', time_max)) - - kafkatee_stats = KafkateeStats(stats_file, key_separator) - # Run update_stats() so that we'll have a list of stats keys that will - # be sent to ganglia. We can use this to build the descriptions dicts - # that metric_init is supposed to return. - # NOTE: This requires that the stats file already have some data in it. - kafkatee_stats.update_stats() - last_run_timestamp = time.time() - - descriptions = [] - - # Iterate through the initial set of stats and create - # dictionary objects for each. - for key, value in kafkatee_stats.flattened_stats.items(): - # skip any keys that are in skip_metrics - if key.split(key_separator)[-1] in skip_metrics: - continue - - # value_type must be one of - # string | uint | float | double. - metric_type = infer_metric_type(value) - # if value is a bool, then convert it to an int - if isinstance(value, bool): - value = int(value) - - if metric_type == 'uint': - metric_format = '%d' - elif metric_type == 'float' or metric_type == 'double': - metric_format = '%f' - else: - metric_format = '%s' - - # Try to infer some useful units from the key name. - if key.endswith('bytes'): - metric_units = 'bytes' - elif key.endswith('tx'): - metric_units = 'transmits' - elif key.endswith('rx'): - metric_units = 'receives' - elif key.endswith('msgs'): - metric_units = 'messages' - elif key.endswith('err') or key.endswith('errs'): - metric_units = 'errors' - elif 'rtt' in key and 'cnt' not in key: - metric_units = 'microseconds' - else: - metric_units = '' - if key.endswith(kafkatee_stats.per_second_key_suffix): - metric_units = ' '.join([metric_units, 'per second']) - - descriptions.append({ - 'name': key, - 'call_back': metric_handler, - 'time_max': time_max, - 'value_type': metric_type, - 'units': metric_units, - 'slope': 'both', - 'format': metric_format, - 'description': '', - 'groups': ganglia_groups, - }) - - return descriptions - - -def metric_cleanup(): - """Teardown; part of Gmond interface""" - pass - -# To run tests: -# python -m unittest kafkatee_ganglia -import unittest # noqa - - -class TestKafkateeGanglia(unittest.TestCase): - - def setUp(self): - self.key_separator = '&' - self.kafkatee_stats = KafkateeStats('/tmp/test-kafkatee.stats.json', self.key_separator) - - self.json_data = { - '1.1': { - 'value1': 0, - 'value2': 'hi', - '1.2': { - 'value3': 0.1, - 'value4': False, - } - }, - '2.1': ['a', 'b'], # noqa - '2.1': ['a', 'b'], # noqa - # '/' should be replaced with key_separator - '3/1': 'nonya', - 'notme': 'nope', - 'kafka': { - 'rdkafka': { - 'time': time.time(), - 'counter': {self.kafkatee_stats.counter_stats[0]: 0} - } - }, - } - self.flattened_should_be = { - '1.1&value1': 0, - '1.1&valuetwo': 'hi', - '1.1&1.2&value3': 0.1, - '1.1&1.2&value4': False, - '2.1&0': 'a', - '2.1&1': 'b', - # '/' should be replaced with key_separator - '3&1': 'nonya', - 'kafka&rdkafka&counter&{0}'.format(self.kafkatee_stats.counter_stats[0]): 0, - 'kafka&rdkafka&time': self.json_data['kafka']['rdkafka']['time'], - } - - def key_filter_callback(self, key): - if key == 'value2': - key = 'valuetwo' - if key == 'notme': - key = False - - return key - - def test_flatten_object(self): - flattened = flatten_object(self.json_data, self.key_separator, self.key_filter_callback) - self.assertEquals(flattened, self.flattened_should_be) - - def test_is_counter_stat(self): - self.assertTrue(self.kafkatee_stats.is_counter_stat(self.kafkatee_stats.counter_stats[0])) - self.assertTrue(self.kafkatee_stats.is_counter_stat( - 'whatever&it&no&matter&' + self.kafkatee_stats.counter_stats[0])) - self.assertFalse(self.kafkatee_stats.is_counter_stat('notone')) - - def test_update_stats(self): - self.kafkatee_stats.update_stats(self.flattened_should_be) - self.assertEquals(self.kafkatee_stats.flattened_stats[ - '1.1&valuetwo'], self.flattened_should_be['1.1&valuetwo']) - - previous_value = self.kafkatee_stats.flattened_stats['1.1&valuetwo'] - self.flattened_should_be['1.1&valuetwo'] = 1 - self.kafkatee_stats.update_stats(self.flattened_should_be) - self.assertEquals(self.kafkatee_stats.flattened_stats[ - '1.1&valuetwo'], self.flattened_should_be['1.1&valuetwo']) - self.assertEquals(self.kafkatee_stats.flattened_stats_previous[ - '1.1&valuetwo'], previous_value) - - def test_rate_of_change_update_stats(self): - counter_key = 'kafka{0}rdkafka{0}counter{0}{1}'.format( - self.key_separator, self.kafkatee_stats.counter_stats[0]) - self.kafkatee_stats.update_stats(self.flattened_should_be) - previous_value = self.flattened_should_be[counter_key] - - # increment the counter and the timestamp to make KafkateeStats calculate - # a new per_second rate - self.flattened_should_be[counter_key] += 101 - self.flattened_should_be['kafka&rdkafka&time'] += 100.0 - self.kafkatee_stats.update_stats(self.flattened_should_be) - - self.assertEquals( - self.kafkatee_stats.flattened_stats_previous[counter_key], previous_value) - self.assertEquals(self.kafkatee_stats.flattened_stats[ - counter_key], self.flattened_should_be[counter_key]) - self.assertEquals(self.kafkatee_stats.flattened_stats[ - 'kafka&rdkafka&time'], self.flattened_should_be['kafka&rdkafka&time']) - per_second_key = self.key_separator.join( - [counter_key, self.kafkatee_stats.per_second_key_suffix]) - - rate_should_be = ( - self.flattened_should_be[counter_key] - - self.kafkatee_stats.flattened_stats_previous[counter_key]) / 100.0 - self.assertEquals(self.kafkatee_stats.flattened_stats[per_second_key], rate_should_be) - - -def generate_pyconf( - module_name, metric_descriptions, params={}, collect_every=15, time_threshold=15): - ''' - Generates a pyconf file including all of the metrics in metric_descriptions. - ''' - - params_string = '' - params_keys = params.keys() - params_keys.sort() - for key in params_keys: - value = params[key] - if isinstance(value, str): - value = '"{0}"'.format(value) - else: - value = str(value) - params_string += ' param %s { value = %s }\n' % (key, value) - - metrics_string = '' - metric_descriptions.sort() - for description in metric_descriptions: - metrics_string += """ - metric { - name = "%(name)s" - } -""" % description - - return """# %(module_name)s plugin for Ganglia Monitor, automatically generated config file -modules { - module { - name = "%(module_name)s" - language = "python" -%(params_string)s - } -} -collection_group { - collect_every = %(collect_every)s - time_threshold = %(time_threshold)s -%(metrics_string)s -} -""" % {'module_name': module_name, - 'params_string': params_string, - 'collect_every': collect_every, - 'time_threshold': time_threshold, - 'metrics_string': metrics_string - } - - -if __name__ == '__main__': - # When invoked as standalone script, run a self-test by querying each - # metric descriptor and printing it out. - - cmdline = optparse.OptionParser(usage="usage: %prog [options] statsfile") - cmdline.add_option( - '--generate-pyconf', '-g', action='store_true', default=False, - help='If set, a .pyconf file will be output with flattened metrics key from statsfile.') - cmdline.add_option( - '--tmax', '-t', action='store', default=15, - help='time_max for ganglia python module metrics.') - cmdline.add_option( - '--key-separator', '-k', dest='key_separator', default='.', - help=( - 'Key separator for flattened json object key name. ' - "Default: '.' '/' is not allowed.")) - cmdline.add_option('--debug', '-D', action='store_true', default=False, - help='Provide more verbose logging for debugging.') - - cli_options, arguments = cmdline.parse_args() - - if (len(arguments) != 1): - cmdline.print_help() - cmdline.error("Must supply statsfile argument.") - - cli_options.stats_file = arguments[0] - - # Turn the optparse.Value object into a regular dict - # so we can pass it to metric_init - params = vars(cli_options) - - # If we are to generate the pyconf file from - # data in stats_file, do so now. - if cli_options.generate_pyconf: - print(generate_pyconf( - 'kafkatee', - metric_init(params), - # set stats_file and tmax from cli options - { - 'tmax': cli_options.tmax, - 'stats_file': cli_options.stats_file, - }, - # collect_every == tmax - cli_options.tmax, - # time_threshold == tmax - cli_options.tmax, - )) - - # Else print out values of metrics in a loop. - else: - # use logger to print to stdout - stdout_handler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s') - stdout_handler.setFormatter(formatter) - logger.addHandler(stdout_handler) - - if (cli_options.debug): - logger.setLevel(logging.DEBUG) - - metric_descriptions = metric_init(params) - while True: - print('----------') - for metric in metric_descriptions: - value = metric['call_back'](metric['name']) - print("{0} => {1} {2}".format(metric['name'], value, metric['units'])) - time.sleep(float(params['tmax'])) diff --git a/manifests/monitoring.pp b/manifests/monitoring.pp deleted file mode 100644 index 7ff86db..0000000 --- a/manifests/monitoring.pp +++ /dev/null @@ -1,36 +0,0 @@ -# == Class kafkatee::monitoring -# Installs kafkatee python ganglia module. -# -class kafkatee::monitoring( - $ensure = 'present' -) -{ - Class['kafkatee'] -> Class['kafkatee::monitoring'] - - $log_statistics_file = $::kafkatee::log_statistics_file - $log_statistics_interval = $::kafkatee::log_statistics_interval - file { '/usr/lib/ganglia/python_modules/kafkatee.py': - source => 'puppet:///modules/kafkatee/kafkatee_ganglia.py', - require => Package['ganglia-monitor'], - notify => Service['ganglia-monitor'], - } - - # Metrics reported by kafkatee_ganglia.py are - # not known until the kafkatee.stats.json file is - # parsed. Run it with the --generate-pyconf option to - # generate the .pyconf file now. - exec { 'generate-kafkatee.pyconf': - require => File['/usr/lib/ganglia/python_modules/kafkatee.py'], - command => "/usr/bin/python /usr/lib/ganglia/python_modules/kafkatee.py --generate --tmax=${log_statistics_interval} ${log_statistics_file} > /etc/ganglia/conf.d/kafkatee.pyconf.new", - onlyif => "/usr/bin/test -f ${log_statistics_file}", - } - - exec { 'replace-kafkatee.pyconf': - cwd => '/etc/ganglia/conf.d', - path => '/bin:/usr/bin', - unless => 'diff -q kafkatee.pyconf.new kafkatee.pyconf && rm kafkatee.pyconf.new', - command => 'mv kafkatee.pyconf.new kafkatee.pyconf', - require => Exec['generate-kafkatee.pyconf'], - notify => Service['ganglia-monitor'], - } -} -- To view, visit https://gerrit.wikimedia.org/r/396088 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: If1ed1245c6e3c09b101e0b06035a80e2b4c38581 Gerrit-PatchSet: 1 Gerrit-Project: operations/puppet/kafkatee Gerrit-Branch: master Gerrit-Owner: Dzahn <dz...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits