Ottomata has uploaded a new change for review.

Change subject: Using custom ganglia module instead of Logster.

Using custom ganglia module instead of Logster.

I never got logster to work with positive counter values properly,
so rather than continue hammering on that, here is an attempt
at a custom ganglia module instead of a nice generic abstraction :/

Change-Id: I8b5b427afd04b14621c38cae6d5b3a3627e41612
D files/
A files/
M manifests/monitoring.pp
A templates/varnishkafka_ganglia.pyconf.erb
4 files changed, 575 insertions(+), 130 deletions(-)

  git pull ssh:// 

diff --git a/files/ b/files/
deleted file mode 100644
index d0c79bf..0000000
--- a/files/
+++ /dev/null
@@ -1,100 +0,0 @@
-###  VarnishkafkaLogster is a subclass of JsonLogster.
-###  It is meant to parse varnishkafka
-###  (
-###  JSON statistics.
-###  Example:
-###  sudo ./logster --dry-run --output=ganglia VarnishkafkaLogster 
-from logster.parsers.JsonLogster import JsonLogster
-from logster.logster_helper import MetricObject
-class VarnishkafkaLogster(JsonLogster):
-    # Default ganglia slope is 'both', aka GAUGE.
-    # These keys should be given a slope of
-    # 'positive' aka COUNTER.
-    counter_metrics = [
-        'tx',
-        'txbytes',
-        'txerrs',
-        'txmsgs',
-        'rx',
-        'rxbytes'
-        'rxerrs',
-        'kafka_drerr',
-        'scratch_toosmall',
-        'txerr',
-        'trunc',
-        'scratch_tmpbufs',
-    ]
-    # metric keys to skip.
-    skip_metrics = [
-        'app_offset',
-        'commited offset',
-        'desired',
-        'eof_offset',
-        'fetch_state',
-        'fetchq_cnt',
-        'fetchq_cnt',
-        'leader',
-        'lp_curr'
-        'name',
-        'next_offset',
-        'nodeid',
-        'partition',
-        'query_offset',
-        'seq',
-        'time',
-        'topic',
-        'toppars',
-        'ts',
-        'unknown',
-    ]
-    def get_metric_object(self, metric_name, metric_value):
-        '''
-        Overrides JsonLogster's get_metric_object() method to
-        manually set slope to positive for counter metrics.
-        '''
-        metric_object = JsonLogster.get_metric_object(self, metric_name, 
-        metric_slope = ''
-        if metric_name.split(self.key_separator)[-1] in 
-            metric_slope = 'positive'
-        metric_object.slope = metric_slope
-        return metric_object
-    def key_filter(self, key):
-        '''
-        Overrides JsonLogster's key_filter method to
-        filter out irrelevant metrics, and to transform
-        the keys of some to make them more readable.
-        '''
-        if key in VarnishkafkaLogster.skip_metrics:
-            return False
-        # prepend appropriate rdkafka or varnishkafka to the key,
-        # depdending on where the metric has come from.
-        elif key == 'varnishkafka':
-            key = 'kafka{0}varnishkafka'.format(self.key_separator)
-        elif key == 'kafka':
-            key = 'kafka{0}rdkafka'.format(self.key_separator)
-        # don't send any bootstrap rdkafka metrics
-        elif 'bootstrap' in key:
-            return False
-        # replace any key separators in the key with '-'
-        elif self.key_separator in key:
-            # this won't do anything if key_separator is '-'
-            key = key.replace(self.key_separator, '-')
-        # don't send anything that starts with -
-        elif key.startswith('-'):
-            return False
-        return key
diff --git a/files/ b/files/
new file mode 100644
index 0000000..8e4af9e
--- /dev/null
+++ b/files/
@@ -0,0 +1,554 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+    Gmond module for posting metrics collected by a logster parser.
+    :copyright: (c) 2013 Wikimedia Foundation
+    :author: Andrew Otto <>
+    :license: GPLv2+
+# for true division when calculating rates of change
+from __future__ import division
+import optparse
+import time
+import json
+import logging
+import sys
+import os
+import copy
+logger = logging.getLogger('varnishkafka_ganglia')
+# metric keys to skip reporting to ganglia
+skip_metrics = [
+    'app_offset',
+    'commited offset',
+    'desired',
+    'eof_offset',
+    'fetch_state',
+    'fetchq_cnt',
+    'fetchq_cnt',
+    'leader',
+    'lp_curr'
+    'name',
+    'next_offset',
+    'nodeid',
+    'partition',
+    'query_offset',
+    'seq',
+    'time',
+    'topic',
+    'toppars',
+    'ts',
+    'unknown',
+def flatten_object(node, separator='.', key_filter_callback=None, 
+    '''
+    Recurses through dicts and/or lists and flattens them
+    into a single level dict of key: value pairs.  Each
+    key consists of all of the recursed keys joined by
+    separator.  If key_filter_callback is callable,
+    it will be called with each key.  It should return
+    either a new key which will be used in the final full
+    key string, or False, which will indicate that this
+    key and its value should be skipped.
+    '''
+    flattened = {}
+    try:
+        iterator = node.iteritems()
+    except AttributeError:
+        iterator = enumerate(node)
+    for key, child in iterator:
+        # If key_filter_callback was provided,
+        # then call it on the key.  If the returned
+        # key is false, then, we know to skip it.
+        if callable(key_filter_callback):
+            key = key_filter_callback(key)
+        if key is False:
+            continue
+        # append this key to the end of all keys seen so far
+        all_keys = parent_keys + [str(key)]
+        if hasattr(child, '__iter__'):
+            # merge the child items all together
+            flattened.update(flatten_object(child, separator, 
key_filter_callback, all_keys))
+        else:
+            # '/' is  not allowed in key names.
+            # Ganglia writes files based on key names
+            # and doesn't escape these in the path.
+            final_key = separator.join(all_keys).replace('/', separator)
+            flattened[final_key] = child
+    return flattened
+def tail(filename, n=2):
+    '''
+    Tails the last n lines from filename and returns them in a list.
+    '''
+    cmd = '/usr/bin/tail -n {0} {1}'.format(n, filename)
+    f = os.popen(cmd)
+    lines ='\n')
+    f.close()
+    return lines
+def infer_metric_type(value):
+    '''
+    Infers ganglia type from the
+    variable type of value.
+    '''
+    if isinstance(value, float):
+        metric_type = 'float'
+    # use unint for int and long.
+    # If bool, use 'string'. (bool is a subtype of int)
+    elif isinstance(value, int) or isinstance(value, long):
+        metric_type = 'unit'
+    else:
+        metric_type = 'string'
+    return metric_type
+class VarnishkafkaStats(object):
+    '''
+    Class representing most recent varnishkafka stats found
+    in the varnishkafka.stats.json file.  Calling update_stats()
+    will cause this class to read the most recent JSON objects from
+    this file, and parse them into a flattened stats dict suitable
+    for easy querying to send to ganglia.
+    '''
+    # Any key that ends with one of these
+    # will cause this update_stats() to
+    # calculate a rate of change for thie stat 
+    # since the last run and insert it into
+    # flattened_stats dict suffixed by
+    # per_second_key_suffix.
+    counter_stats = [
+        'tx',
+        'txbytes',
+        'txerrs',
+        'txmsgs',
+        'rx',
+        'rxbytes'
+        'rxerrs',
+        'kafka_drerr',
+        'scratch_toosmall',
+        'txerr',
+        'trunc',
+        'scratch_tmpbufs',
+    ]
+    per_second_key_suffix = 'per_second'
+    def __init__(self, 
+        self.stats_file               = stats_file
+        self.key_separator            = key_separator
+        self.flattened_stats          = {}
+        self.flattened_stats_previous = {}
+        # varnishkafka outputs two types of distinct JSON objects
+        # each time it outputs stats.  One is for varnishkafka stats,
+        # the other is for librdkafka stats.  We will read this
+        # many lines from the end of the stats file per gmond check.
+        self.distinct_lines_per_interval = 2
+        # timestamp keys from each distinct json object as
+        # they will appear in the self.flattened_stats dict.
+        # These are used for detecting changes in json data
+        # in the stats_file.
+        self.timestamp_keys = [
+            'kafka{0}varnishkafka{0}time'.format(self.key_separator),
+            'kafka{0}rdkafka{0}time'.format(self.key_separator)
+        ]
+    def key_filter(self, key):
+        '''
+        Filters out irrelevant varnishkafka metrics, and to transform
+        the keys of some to make them more readable.
+        '''
+        # prepend appropriate rdkafka or varnishkafka to the key,
+        # depending on where the metric has come from.
+        if key == 'varnishkafka':
+            key = 'kafka{0}varnishkafka'.format(self.key_separator)
+        elif key == 'kafka':
+            key = 'kafka{0}rdkafka'.format(self.key_separator)
+        # don't send any bootstrap rdkafka metrics
+        elif 'bootstrap' in key:
+            return False
+        # replace any key separators in the key with '-'
+        elif self.key_separator in key:
+            # this won't do anything if key_separator is '-'
+            key = key.replace(self.key_separator, '-')
+        # don't send anything that starts with -
+        elif key.startswith('-'):
+            return False
+        return key
+    def is_counter_stat(self, key):
+        '''
+        Returns true if this (flattened or leaf) key is a counter stat,
+        meaning it should always increase during a varnishkafka instance's
+        lifetime.
+        '''
+        return (key.split(self.key_separator)[-1] in self.counter_stats)
+    def tail_stats_file(self):
+        '''
+        Returns the latest distinct_lines_per_interval lines from stats_file 
as a list.
+        '''
+'Reading latest varnishkafka stats from 
+        return tail(self.stats_file, self.distinct_lines_per_interval)
+    def get_latest_stats_from_file(self):
+        '''
+        Reads the latest stats out of stats_file and returns the parsed JSON 
+        '''
+        lines = self.tail_stats_file()
+        stats = {}
+        for line in lines:
+            stats.update(flatten_object(json.loads(line), self.key_separator, 
+        return stats
+    def update_stats(self, stats=None):
+        '''
+        Reads the latest stats out of stats_file and updates the stats
+        attribute of this class.  If the data has changed since the last
+        update, new counter rate of change per seconds stats will also be
+        calculated.
+        '''
+        # Save the current stats into the previous stats
+        # objects so we can compute rate of change over durations
+        # for counter metrics.
+        self.flattened_stats_previous = copy.deepcopy(self.flattened_stats)
+        # If stats weren't manually passed, in, then go ahead and read the
+        # most recent stats out of the stats_file.
+        if not stats:
+            stats = self.get_latest_stats_from_file()
+        self.flattened_stats.update(stats)
+        # If the stats we have now have actually changed since the
+        # last time we updated the stas, then go ahead and compute
+        # new per_second change rates for each counter stat.
+        if self.have_stats_changed_since_last_update():
+            logger.debug('varnishkafka stats have changed since last update.')
+            self.update_counter_rate_stats()
+        else:
+            logger.debug('varnishkafka stats have not changed since last 
+    def stat_rate_of_change(self, key):
+        '''
+        Given a value for stat key name, computes (current stat - previous 
stat) / update interval.
+        pdate interval is computed from the timestamp that comes with the json 
stats, not
+        the time since update_stats() was last called.
+        '''
+        key_list = key.split(self.key_separator)
+        timestamp_key = '{1}{0}{2}{0}time'.format(self.key_separator, 
key_list[0], key_list[1])
+        # if we don't yet have a previous value from which to calculate a
+        # rate, just return 0 for now
+        if not self.flattened_stats or not self.flattened_stats_previous or 
timestamp_key not in self.flattened_stats_previous:
+            return 0.0
+        interval = self.flattened_stats[timestamp_key] - 
+        # if the timestamps are the same, then just return 0.
+        if interval == 0:
+            return 0.0
+        # else calculate the per second rate of change
+        return (self.flattened_stats[key] - 
self.flattened_stats_previous[key]) / interval
+    def update_counter_rate_stats(self):
+        '''
+        For each counter stat, this will add an extra stat
+        to the stats for rate of change per second.
+        '''
+        for key, value in self.flattened_stats.items():
+            if self.is_counter_stat(key):
+                per_second_key = '{0}{1}{2}'.format(key, self.key_separator, 
+                rate = self.stat_rate_of_change(key)
+                self.flattened_stats[per_second_key] = rate
+    def have_stats_changed_since_last_update(self):
+        '''
+        Returns true if either of the timestamp keys in the stats objects
+        differ from the timestamp keys in the previously collected stats 
+        '''
+        if not self.flattened_stats or not self.flattened_stats_previous:
+            return True
+        for key in self.timestamp_keys:
+            if self.flattened_stats[key] != self.flattened_stats_previous[key]:
+                return True
+        return False
+# Gmond Interface
+# global VarnishkafkaStats object, will be
+# instantiated by metric_init()
+varnishkafka_stats = None
+time_max = 15
+last_run_timestamp = 0
+def metric_handler(name):
+    """Get value of particular metric; part of Gmond interface"""
+    global varnishkafka_stats
+    global time_max
+    global last_run_timestamp
+    seconds_since_last_run = time.time() - last_run_timestamp
+    if (seconds_since_last_run >= time_max):
+        logger.debug('Updating varnishkafka_stats since it has been {0} 
seconds, which is more than tmax of {1}'.format(seconds_since_last_run, 
+        varnishkafka_stats.update_stats()
+        last_run_timestamp = time.time()
+    logger.debug('metric_handler called for {0}'.format(name))
+    return varnishkafka_stats.flattened_stats[name]
+def metric_init(params):
+    """Initialize; part of Gmond interface"""
+    global varnishkafka_stats
+    global time_max
+    global last_run_timestamp
+    stats_file     = params.get('stats_file', 
+    key_separator  = params.get('key_separator', '.')
+    ganglia_groups = params.get('groups', 'kafka')
+    time_max       = int(params.get('tmax', time_max))
+    varnishkafka_stats = VarnishkafkaStats(stats_file, key_separator)
+    # Run update_stats() so that we'll have a list of stats keys that will
+    # be sent to ganglia.  We can use this to build the descriptions dicts
+    # that metric_init is supposed to return.
+    # NOTE:  This requires that the stats file already have some data in it.
+    varnishkafka_stats.update_stats()
+    last_run_timestamp = time.time()
+    descriptions = []
+    # run the initial get_metric_objects():
+    # to run logster.parse and read in all of the
+    # MetricObjects from the logfile.
+    # iterate through the initial set of stats and create
+    # dictionary objects for each.
+    for key, value in varnishkafka_stats.flattened_stats.items():
+        # skip any keys that are in skip_metrics
+        if key.split(key_separator)[-1] in skip_metrics:
+            continue
+        # value_type must be one of
+        # string | uint | float | double.
+        # If 'int' is in the metric.type
+        # from the logster parser, then use
+        # 'uint'
+        metric_type = infer_metric_type(value)
+        # if value is a bool, then convert it to an int
+        if isinstance(value, bool):
+            value = int(value)
+        if metric_type == 'uint':
+            metric_format = '%d'
+        elif metric_type == 'float' or metric_type == 'double':
+            metric_format = '%f'
+        else:
+            metric_format = '%s'
+        # try to infer some useful units from the key name.
+        if 'bytes' in key:
+            metric_units = 'bytes'
+        elif 'tx' in key:
+            metric_units = 'transmits'
+        elif 'rx' in key:
+            metric_units = 'receives'
+        elif 'rtt' in key:
+            metric_units = 'microseconds'
+        else:
+            metric_units = ''
+        if key.endswith(varnishkafka_stats.per_second_key_suffix):
+            metric_units = ' '.join([metric_units, 'per second'])
+        descriptions.append({
+            'name':         key,
+            'call_back':    metric_handler,
+            'time_max':     time_max,
+            'value_type':   metric_type,
+            'units':        metric_units,
+            'slope':        'both',
+            'format':       metric_format,
+            'description':  '',
+            'groups':       ganglia_groups,
+        })
+    return descriptions
+def metric_cleanup():
+    """Teardown; part of Gmond interface"""
+    pass
+# To run tests:
+#   python -m unittest varnishkafka_ganglia
+import unittest
+class TestVarnishkafkaGanglia(unittest.TestCase):
+    def setUp(self):
+        self.key_separator = '&'
+        self.varnishkafka_stats = 
VarnishkafkaStats('/tmp/test-varnishkafka.stats.json', self.key_separator)
+        # self.json_line = '{     "1.1": {         "value1": 0,         
"value2": "hi",         "1.2": {             "value3": 0.1,             
"value4": false         }     },     "2.1": ["a","b"] }'
+        self.json_data = {
+            '1.1': {
+                'value1': 0,
+                'value2': 'hi',
+                '1.2': {
+                    'value3': 0.1,
+                    'value4': False,
+                }
+            },
+            '2.1': ['a','b'],
+            '2.1': ['a','b'],
+            # '/' should be replaced with key_separator
+            '3/1': 'nonya',
+            'notme': 'nope',
+            'kafka': {
+                'varnishkafka': { 
+                    'time': time.time(),
+                    'counter': { self.varnishkafka_stats.counter_stats[0]: 0 },
+                },
+                'rdkafka': { 'time': time.time() }
+            },
+        }
+        self.flattened_should_be = {
+            '1.1&value1': 0,
+            '1.1&valuetwo': 'hi',
+            '1.1&1.2&value3': 0.1,
+            '1.1&1.2&value4': False,
+            '2.1&0': 'a',
+            '2.1&1': 'b',
+            # '/' should be replaced with key_separator
+            '3&1': 'nonya',
+            'kafka&varnishkafka&time': 
+            'kafka&rdkafka&time': self.json_data['kafka']['rdkafka']['time'],
+        }
+    def key_filter_callback(self, key):
+        if key == 'value2':
+            key = 'valuetwo'
+        if key == 'notme':
+            key = False
+        return key
+    def test_flatten_object(self):
+        flattened = flatten_object(self.json_data, self.key_separator, 
+        self.assertEquals(flattened, self.flattened_should_be)
+    def test_is_counter_stat(self):
 + self.varnishkafka_stats.counter_stats[0]))
+        self.assertFalse(self.varnishkafka_stats.is_counter_stat('notone'))
+    def test_update_stats(self):
+        self.varnishkafka_stats.update_stats(self.flattened_should_be)
+        previous_value = 
+        self.flattened_should_be['1.1&valuetwo'] = 1
+        self.varnishkafka_stats.update_stats(self.flattened_should_be)
+    def test_rate_of_change_update_stats(self):
+        counter_key = 
+        self.varnishkafka_stats.update_stats(self.flattened_should_be)
+        previous_value = self.flattened_should_be[counter_key]
+        # increment the counter and the timestamp to make VarnishkafkaStats 
calculate a new per_second rate
+        self.flattened_should_be[counter_key] += 101
+        self.flattened_should_be['kafka&varnishkafka&time'] += 100.0
+        self.varnishkafka_stats.update_stats(self.flattened_should_be)
+        per_second_key = '{0}{1}{2}'.format(counter_key, self.key_separator, 
+        rate_should_be = (self.flattened_should_be[counter_key] - 
self.varnishkafka_stats.flattened_stats_previous[counter_key]) / 100.0
+if __name__ == '__main__':
+    # When invoked as standalone script, run a self-test by querying each
+    # metric descriptor and printing it out.
+    ch = logging.StreamHandler(sys.stdout)
+    formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
+    ch.setFormatter(formatter)
+    logger.addHandler(ch)
+    cmdline = optparse.OptionParser(usage="usage: %prog [options] statsfile")
+    cmdline.add_option('--tmax', '-t', action='store', default=15,
+        help='time_max for ganglia python module metrics.')
+    cmdline.add_option('--key-separator', '-k', dest='key_separator', 
+        help='Key separator for flattened json object key name. Default: \'.\' 
 \'/\' is not allowed.')
+    cmdline.add_option('--debug', '-D', action='store_true', default=False,
+                        help='Provide more verbose logging for debugging.')
+    cli_options, arguments = cmdline.parse_args()
+    if (len(arguments) != 1):
+        cmdline.print_help()
+        cmdline.error("Must supply statsfile argument.")
+    if (cli_options.debug):
+        logger.setLevel(logging.DEBUG)
+    cli_options.stats_file  = arguments[0]
+    # Turn the optparse.Value object into a regular dict
+    # so we can pass it to metric_init
+    params = vars(cli_options)
+    # When invoked as standalone script, run a self-test by querying each
+    # metric descriptor and printing it out.
+    print('----------')
+    metric_descriptions = metric_init(params)
+    while True:
+        for metric in metric_descriptions:
+            value = metric['call_back'](metric['name'])
+            print("{0} => {1} {2}".format(metric['name'], value, 
+        time.sleep(float(params['tmax']))
+        print('----------')
diff --git a/manifests/monitoring.pp b/manifests/monitoring.pp
index ba9c976..0e78eba 100644
--- a/manifests/monitoring.pp
+++ b/manifests/monitoring.pp
@@ -1,41 +1,20 @@
 # == Class varnishkafka::monitoring
-# Uses logster (
-# to tail varnishkafka.stats.json file send stats to Ganglia.
+# Installs varnishkafka python ganglia module.
-# TODO: Support more than logster ganglia output via
-# class parameters.
 class varnishkafka::monitoring(
     $ensure = 'present'
     Class['varnishkafka'] -> Class['varnishkafka::monitoring']
-    # varnishkafka monitoring is done via the logster package.
-    package { 'logster':
-        ensure => 'installed',
-        # don't bother doing this unless ganglia is installed
-        require => Package['ganglia-monitor']
+    $log_statistics_file     = $::varnishkafka::log_statistics_file
+    $log_statistics_interval = $::varnishkafka::log_statistics_interval
+    file { '/usr/lib/ganglia/python_modules/':
+        source  => 'puppet:///modules/varnishkafka/',
+        require => Package['ganglia-monitor-python']
-    # put the module in place
-    if !defined(File['/usr/local/share/logster']) {
-        file { '/usr/local/share/logster':
-            ensure => 'directory',
-        }
-    }
-    # Custom JsonLogster parser subclass to filter and transform
-    # a few varnishkafka stats JSON keys.
-    file { '/usr/local/share/logster/':
-        source  => 'puppet:///modules/varnishkafka/',
-        require => File['/usr/local/share/logster'],
-    }
-    # Run logster using the VarnishkafkaLogster parser and send updated stats 
to Ganglia.
-    $cron_command = "export PYTHONPATH=\$PYTHONPATH:/usr/local/share/logster 
&& /usr/bin/logster --output ganglia --gmetric-options='--group=kafka 
--tmax=60' VarnishkafkaLogster.VarnishkafkaLogster 
-    cron { 'varnishkafka-stats-to-ganglia':
-        ensure  => $ensure,
-        command => $cron_command,
-        minute  => '*/1',
-        require => [Package['logster'], 
+    file { '/etc/ganglia/conf.d/varnishkafka.pyconf':
+        content => template('varnishkafka/varnishkafka_ganglia.pyconf.erb')
+        require => Package['ganglia-monitor-python']
diff --git a/templates/varnishkafka_ganglia.pyconf.erb 
new file mode 100644
index 0000000..ee624be
--- /dev/null
+++ b/templates/varnishkafka_ganglia.pyconf.erb
@@ -0,0 +1,12 @@
+modules {
+  module {
+    name = "varnishkafka"
+    language = "python"
+    param stats_file { value = "<%= log_statistics_file %>" }
+    param tmax       { value = <%= log_statistics_interval %> }
+  }
+collection_group {
+  collect_every = <%= log_statistics_interval %>
+  time_threshold = <%= log_statistics_interval %>

To view, visit
To unsubscribe, visit

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8b5b427afd04b14621c38cae6d5b3a3627e41612
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet/varnishkafka
Gerrit-Branch: master
Gerrit-Owner: Ottomata <>

MediaWiki-commits mailing list

Reply via email to