Ori.livneh has uploaded a new change for review. https://gerrit.wikimedia.org/r/204984
Change subject: coal: use set_wakeup_fd & poll for interval timer ...................................................................... coal: use set_wakeup_fd & poll for interval timer Change-Id: I4757a6a992e998d6b16d13fe5f6fa9fc636944e1 --- M modules/webperf/files/coal 1 file changed, 89 insertions(+), 140 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/puppet refs/changes/84/204984/1 diff --git a/modules/webperf/files/coal b/modules/webperf/files/coal index b4ad7dd..3435b55 100755 --- a/modules/webperf/files/coal +++ b/modules/webperf/files/coal @@ -20,24 +20,20 @@ limitations under the License. """ -from __future__ import division - import sys reload(sys) sys.setdefaultencoding('utf-8') import argparse import collections -import heapq -import logging -import logging.handlers +import errno +import fcntl +import os import os.path +import select import signal -import sys -import threading -import time +import syslog -import numpy import whisper import zmq @@ -53,153 +49,106 @@ ) +def median(population): + population = list(sorted(population)) + length = len(population) + if length == 0: + raise ValueError('Cannot compute median of empty list.') + index = (length - 1) // 2 + if length % 2: + return population[index] + middle_terms = population[index] + population[index + 1] + return middle_terms / 2.0 + +def create_interval_timer_fd(initial_delay, interval_seconds): + r, w = os.pipe() + flags = fcntl.fcntl(w, fcntl.F_GETFL, 0) + fcntl.fcntl(w, fcntl.F_SETFL, flags | os.O_NONBLOCK) + signal.set_wakeup_fd(w) + signal.signal(signal.SIGALRM, cmp) + signal.setitimer(signal.ITIMER_REAL, initial_delay, interval_seconds) + return r + + class WhisperLogger(object): - def __init__(self, args=None): - self.args = self.get_argument_parser().parse_args(args) - self.log = self.get_logger() - self.kill_switch = threading.Event() - self.windows = collections.defaultdict(SlidingWindow) + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument( + '--whisper-dir', + default=os.getcwd(), + help='Path for Whisper files. Defaults to working directory.' + ) + arg_parser.add_argument( + 'endpoint', + help='EventLogging endpoint URL.' + ) - def get_connection(self): - socket = zmq.Context().socket(zmq.SUB) - socket.connect(self.args.endpoint) - socket.linger = 0 - socket.subscribe = b'' - return socket + def __init__(self): + self.args = self.arg_parser.parse_args() + self.windows = collections.defaultdict(collections.deque) + syslog.openlog('coal', syslog.LOG_PERROR, syslog.LOG_DAEMON) + + def get_whisper_file(self, metric): + return os.path.join(self.args.whisper_dir, metric + '.wsp') + + def create_whisper_files(self): + archives = [(WINDOW_SPAN, UPDATE_INTERVAL)] + for metric in METRICS: + try: + whisper.create(self.get_whisper_file(metric), archives) + except whisper.InvalidConfiguration as e: + pass # Already exists. def run(self): - self.log.info('starting up') - self.log.info('creating whisper files') self.create_whisper_files() - self.log.info('connecting to %s', self.args.endpoint) - conn = self.get_connection() - - for signum in (signal.SIGALRM, signal.SIGINT): - signal.signal(signum, self.handle_signal) + syslog.syslog('Connecting to %s.' % self.args.endpoint) + socket = zmq.Context().socket(zmq.SUB) + socket.connect(self.args.endpoint) + socket.subscribe = b'' # Accumulate a full WINDOW_SPAN's worth of samples before the first # update. After that, update every UPDATE_INTERVAL seconds. - signal.setitimer(signal.ITIMER_REAL, WINDOW_SPAN, UPDATE_INTERVAL) + # timer_fd = create_interval_timer_fd(WINDOW_SPAN, UPDATE_INTERVAL) + timer_fd = create_interval_timer_fd(10, 20) - self.log.info('entering main loop') + poll = select.epoll() + poll.register(socket.fd, select.EPOLLIN) + poll.register(timer_fd, select.EPOLLIN) + + syslog.syslog('Entering main loop.') while 1: try: - meta = conn.recv_json() - except KeyboardInterrupt: - # PyZMQ 13.0.x raises EINTR as KeyboardInterrupt. - # See <https://github.com/zeromq/pyzmq/pull/338>. - # To work around this, we set a flag in a SIGINT handler. - # The flag lets us distinguish "real" CTRL-C from EINTR. - if self.kill_switch.is_set(): - self.log.info('terminating on ctrl-c') - sys.exit(0) - if meta['schema'] == 'NavigationTiming': - event = meta['event'] - for metric in METRICS: - value = event.get(metric) - if value: - self.windows[metric].push(value) + for fd, _ in poll.poll(): + if fd == timer_fd: + os.read(timer_fd, 1) + self.flush_data() + if fd == socket.fd: + while socket.events & zmq.POLLIN: + meta = socket.recv_json() + self.handle_event(meta) + except IOError as e: + if e.errno != errno.EINTR: + raise - def get_argument_parser(self): - parser = argparse.ArgumentParser() - parser.add_argument( - '-v', '--verbose', - action='store_const', - const=logging.DEBUG, - default=logging.INFO, - dest='log_level', - help='Increase verbosity of output.' - ) - parser.add_argument( - '--log-file', - default=None, - type=os.path.abspath, - help='Specify a file to log output. Disabled by default.' - ) - parser.add_argument( - '--whisper-path', - default=os.getcwd(), - type=os.path.abspath, - help='Path for whisper files' - ) - parser.add_argument('endpoint', help='EventLogging endpoint URL') - return parser - - def get_logger(self): - """Get a logging.Logger instance for this application.""" - logger = logging.getLogger('coal') - handlers = [logging.StreamHandler(stream=sys.stderr)] - if self.args.log_file: - handlers.append(logging.handlers.RotatingFileHandler( - self.args.log_file, backupCount=10, maxBytes=5e7)) - formatter = logging.Formatter('[%(asctime)s] %(message)s') - logger.setLevel(self.args.log_level) - for handler in handlers: - handler.setFormatter(formatter) - handler.setLevel(logging.INFO) - logger.addHandler(handler) - return logger - - def handle_signal(self, signum, frame): - if signum == signal.SIGINT: - # Set the kill switch, so the try / except block in the main loop - # knows that the KeyboardInterrupt exception is real and not an - # EINTR from zmq (see <https://github.com/zeromq/pyzmq/pull/338>). - self.kill_switch.set() - elif signum == signal.SIGALRM: - self.flush_data() + def handle_event(self, meta): + if meta['schema'] != 'NavigationTiming': + return + timestamp = meta['timestamp'] + event = meta['event'] + for metric in METRICS: + value = event.get(metric) + if value: + window = self.windows[metric] + window.append((timestamp, value)) def flush_data(self): - """Write updates to Whisper files.""" - while self.windows: - metric, window = self.windows.popitem() - wsp = os.path.join(self.args.whisper_path, '%s.wsp' % metric) - value = window.current_value() - whisper.update(wsp, value) - self.log.info('%10s - %10s', metric, value) - - def create_whisper_files(self): - """Create Whisper file for each metric. Do nothing if file exists.""" - for metric in METRICS: - wsp = os.path.join(self.args.whisper_path, '%s.wsp' % metric) - try: - whisper.create(wsp, [(UPDATE_INTERVAL, RETENTION)]) - except whisper.InvalidConfiguration as e: - if 'already exists' not in e.message: - raise - else: - self.log.info('Initialized empty Whisper file %s', wsp) - - -class SlidingWindow(object): - - def __init__(self, span=300, aggregate_func=numpy.median): - """Initialize. `span` is the size of the window, in seconds.""" - self.heap = [] - self.span = span - self.last_update = 0 - self.aggregate = aggregate_func - - def prune(self): - while self.heap and self.last_update - self.heap[0][0] > self.span: - heapq.heappop(self.heap) - - def push(self, value, timestamp=None): - """Add an item to the window.""" - if timestamp is None: - timestamp = time.time() - if timestamp > self.last_update: - self.last_update = timestamp - heapq.heappush(self.heap, (timestamp, value)) - self.prune() - - def current_value(self): - """Returns a copy of the list of items in the window.""" - self.prune() - values = [value for timestamp, value in self.heap] - return self.aggregate(values) + for metric, window in sorted(self.windows.items()): + while window[-1][0] - window[0][0] > WINDOW_SPAN: + window.popleft() + current_value = median(value for timestamp, value in window) + whisper.update(self.get_whisper_file(metric), current_value) + syslog.syslog('%s: %d' % (metric, current_value)) if __name__ == '__main__': -- To view, visit https://gerrit.wikimedia.org/r/204984 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I4757a6a992e998d6b16d13fe5f6fa9fc636944e1 Gerrit-PatchSet: 1 Gerrit-Project: operations/puppet Gerrit-Branch: production Gerrit-Owner: Ori.livneh <o...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits