Mwalker has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/137231

Change subject: Adding StatsD Support
......................................................................

Adding StatsD Support

... and fixing some start up errors where we would crash uncontrollably
with no redis server.
... also added some startup sanity checking on file paths to make sure
they exist (otherwise we crash with no clear warning why).

Change-Id: I38be8c0f8654d98213d4dc4d849818c661922e50
---
M defaults.js
A lib/statsd.js
M lib/threads/backend.js
M lib/threads/frontend.js
M mw-ocg-service.js
M package.json
6 files changed, 315 insertions(+), 31 deletions(-)


  git pull 
ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/Collection/OfflineContentGenerator
 refs/changes/31/137231/1

diff --git a/defaults.js b/defaults.js
index 05f8d4c..3b74946 100644
--- a/defaults.js
+++ b/defaults.js
@@ -6,27 +6,31 @@
  * module.exports = function(config) { config.foo = 'bar'; }
  */
 module.exports = {
+       /** Service management thread, coordinates (re)launching threads and 
initial global setup */
        "coordinator": {
+               /** The number of frontend threads to spawn. At the moment we 
don't have good data on how
+                * many clients can be served via a thread in a production 
environment.
+                */
                "frontend_threads": 2,
+               /** The number of backend threads to spawn. These are heavy 
rendering threads and so should
+                * be set to some ratio of CPU cores. If set to "auto" the 
coordinator will launch a thread
+                * per CPU core.
+                */
                "backend_threads": "auto",
-               "runtime_user": null,
 
+               /** Public hostname of this instance for HTTP GET requests for 
locally stored content. */
                "hostname": null
        },
+       /** Configuration for the frontend HTTP server thread. You can choose 
to serve
+        * content via a local socket, or an IP address. If both are null the 
server will
+        * bind to all IP addresses.
+        */
        "frontend": {
+               "address": null,
                "socket": null,
-               "port": 17080,
-               "address": null
+               "port": 17080
        },
-       "redis": {
-               "host": "localhost",
-               "port": 6379,
-               "password": null,
-               "retry_max_delay": 60000,
-
-               "job_queue_name": "render_job_queue",
-               "status_set_name": "job_status"
-       },
+       /** Configuration for the backend bundling & and rendering process 
threads. */
        "backend": {
                "bundler": {
                        "bin": "../mw-ocg-bundler/bin/mw-ocg-bundler",
@@ -45,5 +49,34 @@
                },
 
                "temp_dir": null
+       },
+       /** Redis is used in both the frontend and backend for queueing jobs 
and job
+        * metadata storage.
+        */
+       "redis": {
+               "host": "localhost",
+               "port": 6379,
+               "password": null,
+               "retry_max_delay": 60000,
+
+               "job_queue_name": "render_job_queue",
+               "status_set_name": "job_status"
+       },
+       /** Active metric reporting via the StatsD protocol. General health can 
be obtained by querying
+        * the frontend with a HTTP GET ?request=health query
+        */
+       "reporting": {
+               /** If true will send UDP packets to the StatsD server. */
+               "enable": false,
+               /** Hostname to send StatsD metrics to. */
+               "statsd_server": "localhost",
+               /** Port to send StatsD metrics to. */
+               "statsd_port": 8125,
+               /** The txstatsd daemon can have non standard behaviour. If 
you're running the
+                * ConfigurableCollector set this to true.
+                */
+               "is_txstatsd": false,
+               /** Prefix for all statistics generated by this application */
+               "prefix": "ocg.pdf."
        }
 };
diff --git a/lib/statsd.js b/lib/statsd.js
new file mode 100644
index 0000000..e33dc02
--- /dev/null
+++ b/lib/statsd.js
@@ -0,0 +1,209 @@
+/**
+ * Modified version of https://github.com/sivy/node-statsd/ for WMF specific
+ * txstatsd constraints.
+ *
+ * We've removed increment/decrement and nullified the sets function if 
operating
+ * in txstatsd mode.
+ *
+ * Original license to node-statsd:
+ *
+ * Copyright 2011 Steve Ivy. All rights reserved.
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ *
+ * @type {exports}
+ */
+
+var dgram = require('dgram'),
+       dns   = require('dns');
+
+/**
+ * The UDP Client for StatsD
+ * @param options
+ *   @option host      {String}  The host to connect to default: localhost
+ *   @option port      {String|Integer} The port to connect to default: 8125
+ *   @option prefix    {String}  An optional prefix to assign to each stat 
name sent
+ *   @option suffix    {String}  An optional suffix to assign to each stat 
name sent
+ *   @option txstatsd  {boolean} An optional boolean that if true swaps 
counters for meters
+ *   @option globalize {boolean} An optional boolean to add "statsd" as an 
object in the global namespace
+ *   @option cacheDns  {boolean} An optional option to only lookup the 
hostname -> ip address once
+ *   @option mock      {boolean} An optional boolean indicating this Client is 
a mock object, no stats are sent.
+ * @constructor
+ */
+var Client = function (host, port, prefix, suffix, txstatsd, globalize, 
cacheDns, mock) {
+       var options = host || {},
+               self = this;
+
+       if(arguments.length > 1 || typeof(host) === 'string'){
+               options = {
+                       host      : host,
+                       port      : port,
+                       prefix    : prefix,
+                       suffix    : suffix,
+                       txstatsd  : txstatsd,
+                       globalize : globalize,
+                       cacheDns  : cacheDns,
+                       mock      : mock === true
+               };
+       }
+
+       this.host   = options.host || 'localhost';
+       this.port   = options.port || 8125;
+       this.prefix = options.prefix || '';
+       this.suffix = options.suffix || '';
+       this.txstatsd = options.txstatsd || false;
+       this.socket = dgram.createSocket('udp4');
+       this.mock   = options.mock;
+
+
+       if(options.cacheDns === true){
+               dns.lookup(options.host, function(err, address, family){
+                       if(err == null){
+                               self.host = address;
+                       }
+               });
+       }
+
+       if(options.globalize){
+               global.statsd = this;
+       }
+};
+
+/**
+ * Represents the timing stat
+ * @param stat {String|Array} The stat(s) to send
+ * @param time {Number} The time in milliseconds to send
+ * @param sampleRate {Number} The Number of times to sample (0 to 1)
+ * @param callback {Function} Callback when message is done being delivered. 
Optional.
+ */
+Client.prototype.timing = function (stat, time, sampleRate, callback) {
+       this.sendAll(stat, time, 'ms', sampleRate, callback);
+};
+
+/**
+ * Increments a stat by a specified amount
+ * @param stat {String|Array} The stat(s) to send
+ * @param value The value to send
+ * @param sampleRate {Number} The Number of times to sample (0 to 1)
+ * @param callback {Function} Callback when message is done being delivered. 
Optional.
+ */
+Client.prototype.increment = function (stat, value, sampleRate, callback) {
+       var type = this.txstatsd ? 'm' : 'c';
+       this.sendAll(stat, value || 1, type, sampleRate, callback);
+};
+
+/**
+ * Gauges a stat by a specified amount
+ * @param stat {String|Array} The stat(s) to send
+ * @param value The value to send
+ * @param sampleRate {Number} The Number of times to sample (0 to 1)
+ * @param callback {Function} Callback when message is done being delivered. 
Optional.
+ */
+Client.prototype.gauge = function (stat, value, sampleRate, callback) {
+       this.sendAll(stat, value, 'g', sampleRate, callback);
+};
+
+/**
+ * Counts unique values by a specified amount
+ * @param stat {String|Array} The stat(s) to send
+ * @param value The value to send
+ * @param sampleRate {Number} The Number of times to sample (0 to 1)
+ * @param callback {Function} Callback when message is done being delivered. 
Optional.
+ */
+Client.prototype.unique =
+       Client.prototype.set = function (stat, value, sampleRate, callback) {
+               this.sendAll(stat, value, 's', sampleRate, callback);
+       };
+
+/**
+ * Checks if stats is an array and sends all stats calling back once all have 
sent
+ * @param stat {String|Array} The stat(s) to send
+ * @param value The value to send
+ * @param sampleRate {Number} The Number of times to sample (0 to 1)
+ * @param callback {Function} Callback when message is done being delivered. 
Optional.
+ */
+Client.prototype.sendAll = function(stat, value, type, sampleRate, callback){
+       var completed = 0,
+               calledback = false,
+               sentBytes = 0,
+               self = this;
+
+       /**
+        * Gets called once for each callback, when all callbacks return we will
+        * call back from the function
+        * @private
+        */
+       function onSend(error, bytes){
+               completed += 1;
+               if(calledback || typeof callback !== 'function'){
+                       return;
+               }
+
+               if(error){
+                       calledback = true;
+                       return callback(error);
+               }
+
+               sentBytes += bytes;
+               if(completed === stat.length){
+                       callback(null, sentBytes);
+               }
+       }
+
+       if(Array.isArray(stat)){
+               stat.forEach(function(item){
+                       self.send(item, value, type, sampleRate, onSend);
+               });
+       } else {
+               this.send(stat, value, type, sampleRate, callback);
+       }
+};
+
+/**
+ * Sends a stat across the wire
+ * @param stat {String|Array} The stat(s) to send
+ * @param value The value to send
+ * @param type {String} The type of message to send to statsd
+ * @param sampleRate {Number} The Number of times to sample (0 to 1)
+ * @param callback {Function} Callback when message is done being delivered. 
Optional.
+ */
+Client.prototype.send = function (stat, value, type, sampleRate, callback) {
+       var message = this.prefix + stat + this.suffix + ':' + value + '|' + 
type,
+               buf;
+
+       if(sampleRate && sampleRate < 1){
+               if(Math.random() < sampleRate){
+                       message += '|@' + sampleRate;
+               } else {
+                       //don't want to send if we don't meet the sample ratio
+                       return;
+               }
+       }
+
+       // Only send this stat if we're not a mock Client.
+       if(!this.mock) {
+               buf = new Buffer(message);
+               this.socket.send(buf, 0, buf.length, this.port, this.host, 
callback);
+       } else {
+               if(typeof callback === 'function'){
+                       callback(null, 0);
+               }
+       }
+};
+
+module.exports = Client;
\ No newline at end of file
diff --git a/lib/threads/backend.js b/lib/threads/backend.js
index 9d46d66..b4c69fd 100644
--- a/lib/threads/backend.js
+++ b/lib/threads/backend.js
@@ -40,6 +40,8 @@
  * @param config_obj Configuration object
  */
 function initBackend(config_obj) {
+       var writer, paths;
+
        config = config_obj;
        if (!config.backend.temp_dir) {
                if (os.tmpdir) {
@@ -55,6 +57,26 @@
                config.redis.port,
                config.redis.password
        );
+
+       // Do some brief sanity checking
+       paths = [
+               config.backend.temp_dir,
+               config.backend.bundler.bin
+       ];
+       for (writer in config.backend.writers) {
+               if (config.backend.writers.hasOwnProperty(writer)) {
+                       paths.push(config.backend.writers[writer].bin);
+               }
+       }
+       paths.forEach(function(value) {
+               if (!fs.existsSync(value)) {
+                       console.error(
+                               "Configuration error: Cannot determine if %s 
exists!",
+                               path.resolve(value)
+                       );
+                       process.exit(1);
+               }
+       });
 }
 
 /**
@@ -154,7 +176,7 @@
        ]);
        child.on('error', function(err) {
                console.error('Bundler child reported back with spawn error: 
%s', err);
-               jobDetails.updateError('Could not launch bunlding process');
+               jobDetails.updateError('Could not launch bundling process');
                redisClient.hset(config.redis.status_set_name, 
jobDetails.collectionId, JSON.stringify(jobDetails));
                getNewItemFromQueue();
        });
diff --git a/lib/threads/frontend.js b/lib/threads/frontend.js
index 58a3fe8..8f3b18d 100644
--- a/lib/threads/frontend.js
+++ b/lib/threads/frontend.js
@@ -155,6 +155,7 @@
                args = url.parse(request.url, true ).query;
                handleRequest(args, response);
        } else {
+               statsd.increment('frontend.requests.malformed');
                response.writeHead(405, "Only GET and POST requests are 
supported");
                response.end();
        }
@@ -167,7 +168,10 @@
  * @param response http.ServerResponse
  */
 function handleRequest(args, response) {
+       var time = Date.now();
+
        console.debug('attempting to handle request: %s', JSON.stringify(args));
+       statsd.increment('frontend.requests.' + args.command);
        try {
                switch (args.command) {
                        case 'health':
@@ -203,6 +207,8 @@
                        server.close(function() {process.exit(1);});
                }
        }
+
+       statsd.timing('frontend.requests.' + args.command + 'response_time', 
Date.now() - time);
 }
 
 /**
diff --git a/mw-ocg-service.js b/mw-ocg-service.js
index dccb4af..3faa163 100755
--- a/mw-ocg-service.js
+++ b/mw-ocg-service.js
@@ -33,6 +33,7 @@
 
 var cluster = require('cluster');
 var commander = require('commander');
+var StatsD = require('./lib/statsd.js');
 var os = require('os');
 require('rconsole');
 
@@ -40,7 +41,7 @@
 var config = require('./defaults.js');
 commander
        .version('0.0.1')
-       .option('-c, --config <path>', 'Path to the local configuration file', 
'/etc/mw-ocg-service.js')
+       .option('-c, --config <path>', 'Path to the local configuration file')
        .parse(process.argv);
 
 try {
@@ -49,6 +50,7 @@
        }
 } catch(err) {
        console.log("Could not open configuration file %s! %s", 
commander.config, err);
+       process.exit(1);
 }
 
 /* === Initial Logging ===================================================== */
@@ -56,15 +58,16 @@
        facility: 'local0',
        title: 'mw-ocg-service'
 });
-
-/* === Downgrade our permissions =========================================== */
-var runtimeUser = config.coordinator.runtime_user || process.getuid();
-try {
-       process.setuid(runtimeUser);
-} catch (err) {
-       console.error('Could not set user to "%s": %s', runtimeUser, err);
-       process.exit(1);
-}
+new StatsD(
+       config.reporting.statsd_server,
+       config.reporting.statsd_port,
+       config.reporting.prefix,
+       '',
+       config.reporting.is_txstatsd,
+       true,
+       true,
+       config.reporting.enable
+);
 
 /* === Fork the heck out of ourselves! ========================================
 * The basic idea is that we have this controlling process which launches and
@@ -82,7 +85,7 @@
 if (cluster.isMaster) {
        var respawnWorkers = true;
        var workerTypes = {};
-       var newWorker = null;
+       var lastRestart = {};
        var autoThreads;
        var i;
 
@@ -125,6 +128,14 @@
        process.on('SIGTERM', gracefulShutdown);
        process.on('SIGHUP', immediateShutdown);
 
+       var spawnWorker = function spawnWorker(workerType) {
+               var newWorker = null;
+               lastRestart[workerType] = Date.now();
+               statsd.increment(workerType + '.restarts');
+               newWorker = cluster.fork({COLLECTOID_CHILD_TYPE: workerType});
+               workerTypes[newWorker.process.pid] = workerType;
+       };
+
        cluster.on('disconnect', function(worker) {
                console.info(
                        'Worker (pid %d) has disconnected. Suicide: %s. 
Restarting: %s.',
@@ -133,15 +144,19 @@
                        respawnWorkers
                );
                if (respawnWorkers) {
-                       newWorker = cluster.fork({COLLECTOID_CHILD_TYPE: 
workerTypes[worker.process.pid]});
-                       workerTypes[newWorker.process.pid] = 
workerTypes[worker.process.pid];
+                       if (lastRestart[workerTypes[worker.process.pid]] > 
Date.now() - 1000) {
+                               // Only allow a restart of a backend thread 
once a second
+                               console.info("Cannot immediately respawn 
thread. Waiting 1s to avoid forkbombing.")
+                               setTimeout(spawnWorker, 1000, 
workerTypes[worker.process.pid]);
+                       } else {
+                               spawnWorker(workerTypes[worker.process.pid]);
+                       }
                }
                delete workerTypes[worker.process.pid];
        });
 
        for (i = 0; i < config.coordinator.frontend_threads; i++) {
-               newWorker = cluster.fork({COLLECTOID_CHILD_TYPE: 'frontend'});
-               workerTypes[newWorker.process.pid] = 'frontend';
+               spawnWorker('frontend');
        }
 
        autoThreads = config.coordinator.backend_threads;
@@ -149,8 +164,7 @@
                autoThreads = os.cpus().length;
        }
        for (i = 0; i < autoThreads; i++) {
-               newWorker = cluster.fork({COLLECTOID_CHILD_TYPE: 'backend'});
-               workerTypes[newWorker.process.pid] = 'backend';
+               spawnWorker('backend');
        }
 
 } else {
diff --git a/package.json b/package.json
index b21bf6f..d41f135 100644
--- a/package.json
+++ b/package.json
@@ -18,7 +18,7 @@
   "dependencies": {
     "async": "~0.2.9",
     "busboy": "0.0.12",
-       "commander": "~2.1.0",
+    "commander": "~2.1.0",
     "mime": "~1.2.11",
     "rconsole": "~0.2.0",
     "redis": "~0.9.0"

-- 
To view, visit https://gerrit.wikimedia.org/r/137231
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I38be8c0f8654d98213d4dc4d849818c661922e50
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/Collection/OfflineContentGenerator
Gerrit-Branch: master
Gerrit-Owner: Mwalker <mwal...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to