Ori.livneh has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/204984

Change subject: coal: use set_wakeup_fd & poll for interval timer
......................................................................

coal: use set_wakeup_fd & poll for interval timer

Change-Id: I4757a6a992e998d6b16d13fe5f6fa9fc636944e1
---
M modules/webperf/files/coal
1 file changed, 89 insertions(+), 140 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/84/204984/1

diff --git a/modules/webperf/files/coal b/modules/webperf/files/coal
index b4ad7dd..3435b55 100755
--- a/modules/webperf/files/coal
+++ b/modules/webperf/files/coal
@@ -20,24 +20,20 @@
   limitations under the License.
 
 """
-from __future__ import division
-
 import sys
 reload(sys)
 sys.setdefaultencoding('utf-8')
 
 import argparse
 import collections
-import heapq
-import logging
-import logging.handlers
+import errno
+import fcntl
+import os
 import os.path
+import select
 import signal
-import sys
-import threading
-import time
+import syslog
 
-import numpy
 import whisper
 import zmq
 
@@ -53,153 +49,106 @@
 )
 
 
+def median(population):
+    population = list(sorted(population))
+    length = len(population)
+    if length == 0:
+        raise ValueError('Cannot compute median of empty list.')
+    index = (length - 1) // 2
+    if length % 2:
+        return population[index]
+    middle_terms = population[index] + population[index + 1]
+    return middle_terms / 2.0
+
+def create_interval_timer_fd(initial_delay, interval_seconds):
+    r, w = os.pipe()
+    flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
+    fcntl.fcntl(w, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+    signal.set_wakeup_fd(w)
+    signal.signal(signal.SIGALRM, cmp)
+    signal.setitimer(signal.ITIMER_REAL, initial_delay, interval_seconds)
+    return r
+
+
 class WhisperLogger(object):
 
-    def __init__(self, args=None):
-        self.args = self.get_argument_parser().parse_args(args)
-        self.log = self.get_logger()
-        self.kill_switch = threading.Event()
-        self.windows = collections.defaultdict(SlidingWindow)
+    arg_parser = argparse.ArgumentParser()
+    arg_parser.add_argument(
+        '--whisper-dir',
+        default=os.getcwd(),
+        help='Path for Whisper files. Defaults to working directory.'
+    )
+    arg_parser.add_argument(
+        'endpoint',
+        help='EventLogging endpoint URL.'
+    )
 
-    def get_connection(self):
-        socket = zmq.Context().socket(zmq.SUB)
-        socket.connect(self.args.endpoint)
-        socket.linger = 0
-        socket.subscribe = b''
-        return socket
+    def __init__(self):
+        self.args = self.arg_parser.parse_args()
+        self.windows = collections.defaultdict(collections.deque)
+        syslog.openlog('coal', syslog.LOG_PERROR, syslog.LOG_DAEMON)
+
+    def get_whisper_file(self, metric):
+        return os.path.join(self.args.whisper_dir, metric + '.wsp')
+
+    def create_whisper_files(self):
+        archives = [(WINDOW_SPAN, UPDATE_INTERVAL)]
+        for metric in METRICS:
+            try:
+                whisper.create(self.get_whisper_file(metric), archives)
+            except whisper.InvalidConfiguration as e:
+                pass  # Already exists.
 
     def run(self):
-        self.log.info('starting up')
-        self.log.info('creating whisper files')
         self.create_whisper_files()
 
-        self.log.info('connecting to %s', self.args.endpoint)
-        conn = self.get_connection()
-
-        for signum in (signal.SIGALRM, signal.SIGINT):
-            signal.signal(signum, self.handle_signal)
+        syslog.syslog('Connecting to %s.' % self.args.endpoint)
+        socket = zmq.Context().socket(zmq.SUB)
+        socket.connect(self.args.endpoint)
+        socket.subscribe = b''
 
         # Accumulate a full WINDOW_SPAN's worth of samples before the first
         # update. After that, update every UPDATE_INTERVAL seconds.
-        signal.setitimer(signal.ITIMER_REAL, WINDOW_SPAN, UPDATE_INTERVAL)
+        # timer_fd = create_interval_timer_fd(WINDOW_SPAN, UPDATE_INTERVAL)
+        timer_fd = create_interval_timer_fd(10, 20)
 
-        self.log.info('entering main loop')
+        poll = select.epoll()
+        poll.register(socket.fd, select.EPOLLIN)
+        poll.register(timer_fd, select.EPOLLIN)
+
+        syslog.syslog('Entering main loop.')
         while 1:
             try:
-                meta = conn.recv_json()
-            except KeyboardInterrupt:
-                # PyZMQ 13.0.x raises EINTR as KeyboardInterrupt.
-                # See <https://github.com/zeromq/pyzmq/pull/338>.
-                # To work around this, we set a flag in a SIGINT handler.
-                # The flag lets us distinguish "real" CTRL-C from EINTR.
-                if self.kill_switch.is_set():
-                    self.log.info('terminating on ctrl-c')
-                    sys.exit(0)
-            if meta['schema'] == 'NavigationTiming':
-                event = meta['event']
-                for metric in METRICS:
-                    value = event.get(metric)
-                    if value:
-                        self.windows[metric].push(value)
+                for fd, _ in poll.poll():
+                    if fd == timer_fd:
+                        os.read(timer_fd, 1)
+                        self.flush_data()
+                    if fd == socket.fd:
+                        while socket.events & zmq.POLLIN:
+                            meta = socket.recv_json()
+                            self.handle_event(meta)
+            except IOError as e:
+                if e.errno != errno.EINTR:
+                    raise
 
-    def get_argument_parser(self):
-        parser = argparse.ArgumentParser()
-        parser.add_argument(
-            '-v', '--verbose',
-            action='store_const',
-            const=logging.DEBUG,
-            default=logging.INFO,
-            dest='log_level',
-            help='Increase verbosity of output.'
-        )
-        parser.add_argument(
-            '--log-file',
-            default=None,
-            type=os.path.abspath,
-            help='Specify a file to log output. Disabled by default.'
-        )
-        parser.add_argument(
-            '--whisper-path',
-            default=os.getcwd(),
-            type=os.path.abspath,
-            help='Path for whisper files'
-        )
-        parser.add_argument('endpoint', help='EventLogging endpoint URL')
-        return parser
-
-    def get_logger(self):
-        """Get a logging.Logger instance for this application."""
-        logger = logging.getLogger('coal')
-        handlers = [logging.StreamHandler(stream=sys.stderr)]
-        if self.args.log_file:
-            handlers.append(logging.handlers.RotatingFileHandler(
-                self.args.log_file, backupCount=10, maxBytes=5e7))
-        formatter = logging.Formatter('[%(asctime)s] %(message)s')
-        logger.setLevel(self.args.log_level)
-        for handler in handlers:
-            handler.setFormatter(formatter)
-            handler.setLevel(logging.INFO)
-            logger.addHandler(handler)
-        return logger
-
-    def handle_signal(self, signum, frame):
-        if signum == signal.SIGINT:
-            # Set the kill switch, so the try / except block in the main loop
-            # knows that the KeyboardInterrupt exception is real and not an
-            # EINTR from zmq (see <https://github.com/zeromq/pyzmq/pull/338>).
-            self.kill_switch.set()
-        elif signum == signal.SIGALRM:
-            self.flush_data()
+    def handle_event(self, meta):
+        if meta['schema'] != 'NavigationTiming':
+            return
+        timestamp = meta['timestamp']
+        event = meta['event']
+        for metric in METRICS:
+            value = event.get(metric)
+            if value:
+                window = self.windows[metric]
+                window.append((timestamp, value))
 
     def flush_data(self):
-        """Write updates to Whisper files."""
-        while self.windows:
-            metric, window = self.windows.popitem()
-            wsp = os.path.join(self.args.whisper_path, '%s.wsp' % metric)
-            value = window.current_value()
-            whisper.update(wsp, value)
-            self.log.info('%10s - %10s', metric, value)
-
-    def create_whisper_files(self):
-        """Create Whisper file for each metric. Do nothing if file exists."""
-        for metric in METRICS:
-            wsp = os.path.join(self.args.whisper_path, '%s.wsp' % metric)
-            try:
-                whisper.create(wsp, [(UPDATE_INTERVAL, RETENTION)])
-            except whisper.InvalidConfiguration as e:
-                if 'already exists' not in e.message:
-                    raise
-            else:
-                self.log.info('Initialized empty Whisper file %s', wsp)
-
-
-class SlidingWindow(object):
-
-    def __init__(self, span=300, aggregate_func=numpy.median):
-        """Initialize. `span` is the size of the window, in seconds."""
-        self.heap = []
-        self.span = span
-        self.last_update = 0
-        self.aggregate = aggregate_func
-
-    def prune(self):
-        while self.heap and self.last_update - self.heap[0][0] > self.span:
-            heapq.heappop(self.heap)
-
-    def push(self, value, timestamp=None):
-        """Add an item to the window."""
-        if timestamp is None:
-            timestamp = time.time()
-        if timestamp > self.last_update:
-            self.last_update = timestamp
-        heapq.heappush(self.heap, (timestamp, value))
-        self.prune()
-
-    def current_value(self):
-        """Returns a copy of the list of items in the window."""
-        self.prune()
-        values = [value for timestamp, value in self.heap]
-        return self.aggregate(values)
+        for metric, window in sorted(self.windows.items()):
+            while window[-1][0] - window[0][0] > WINDOW_SPAN:
+                window.popleft()
+            current_value = median(value for timestamp, value in window)
+            whisper.update(self.get_whisper_file(metric), current_value)
+            syslog.syslog('%s: %d' % (metric, current_value))
 
 
 if __name__ == '__main__':

-- 
To view, visit https://gerrit.wikimedia.org/r/204984
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4757a6a992e998d6b16d13fe5f6fa9fc636944e1
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ori.livneh <o...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to