Cscott has uploaded a new change for review. https://gerrit.wikimedia.org/r/284598
Change subject: Improve graceful shutdown. ...................................................................... Improve graceful shutdown. During a graceful shutdown we don't halt background render tasks or garbage collections which are in progress; instead waiting politely for these to finish before shutting down. We also shut down the front end tasks *last* to minimize downtime for those fetching cached resources. Change-Id: Id128e7ad3ba06d4da6302c2c42b8f2c32e58de9b --- M lib/RedisWrapper.js M lib/threads/backend.js M lib/threads/frontend.js M lib/threads/gc.js M mw-ocg-service.js 5 files changed, 125 insertions(+), 70 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/Collection/OfflineContentGenerator refs/changes/98/284598/1 diff --git a/lib/RedisWrapper.js b/lib/RedisWrapper.js index 36d9b12..323a5a6 100644 --- a/lib/RedisWrapper.js +++ b/lib/RedisWrapper.js @@ -86,7 +86,8 @@ /** * Gracefully closes the connection. Emits 'closed' when complete. */ -RedisWrapper.prototype.close = function() { +RedisWrapper.prototype.close = function(cb) { + if (cb) { this.once('closed', cb); } if (this.client && this.client.connected) { this.client.quit(); } else { diff --git a/lib/threads/backend.js b/lib/threads/backend.js index e3a60d1..8723c09 100644 --- a/lib/threads/backend.js +++ b/lib/threads/backend.js @@ -47,6 +47,7 @@ var config = null; var redisClient = null; +var stopping = false; /* === Public Exported Functions =========================================== */ /** @@ -120,16 +121,13 @@ * Starts the backend server */ function startBackend() { - var loop = false; redisClient.on('closed', function() { - if (!loop) { - loop = true; - console.error( - 'Backend connection to redis died unexpectedly.', - { channel: 'backend.error.fatal' } - ); - stopBackend(process.exit); - } + if (stopping) return; + console.error( + 'Backend connection to redis died unexpectedly.', + { channel: 'backend.error.fatal' } + ); + stopBackend(process.exit); }); redisClient.on('opened', getNewItemFromQueue); redisClient.connect(); @@ -145,8 +143,8 @@ * @param {callback} callbackFunc - Function to call when server successfully closed */ function stopBackend(callbackFunc) { - redisClient.close(); - callbackFunc(); + stopping = callbackFunc || function() { }; + // now we have to wait until the backend is done with its job. } /* === Private Functions =================================================== */ @@ -155,43 +153,51 @@ * then starts the promise chain to bundle, render, and cleanup. */ function getNewItemFromQueue() { - redisClient.blpop(config.redis.job_queue_name, 0) + // Before checking the redis queue, see if this thread should shut down + if (stopping) { + console.info( + 'Backend thread halting as requested.', { channel: 'backend' } + ); + return redisClient.close(stopping); + } + redisClient.blpop(config.redis.job_queue_name, 3) .then(function(result) { // The return, if there is one, will be [<listname>,<listitem>] if (!result) { - throw new BackendError('Redis returned nil when picking up new job from queue.'); - } else { - console.info( - 'Got new job "%s", attempting to get status details and launching', - result[1], - { - channel: 'backend', - job: { id: result[1] }, - } - ); - return redisClient.hget(config.redis.status_set_name, result[1]); + // Timeout. Check our host status, and then retry. + return; } - }) - .catch(function(err) { + console.info( + 'Got new job "%s", attempting to get status details and launching', + result[1], + { + channel: 'backend', + job: { id: result[1] }, + } + ); + return redisClient + .hget(config.redis.status_set_name, result[1]) + .then(newItemFromQueue) + .catch(function(err) { + // Catch the error here because it's likely non fatal. + // If this bubbled up it would cause the thread to + // restart -- not the end of the world, but not + // desirable. + console.error('Unhandled error while attempting to process a metabook job', { + channel: 'backend.error', + err: err, + }); + }); + }).then(function() { + // Start the loop again. Use setTimeout so that we don't + // continually grow the stack. + setTimeout(getNewItemFromQueue, 1); + }, function(err) { console.error('Error picking up new job from queue.', { channel: 'backend.error.fatal', err: err, }); stopBackend(process.exit); - }) - .then(newItemFromQueue) - .catch (function(err) { - // Catch the error here because it's likely non fatal. If this bubbled - // up it would cause the thread to restart -- not the end of the world - // but not desirable - console.error('Unhandled error while attempting to process a metabook job', { - channel: 'backend.error', - err: err, - }); - }) - .then(function() { - // Start the loop again. Use setTimeout so that we don't continually grow the stack - setTimeout(getNewItemFromQueue, 1); }) .done(); } diff --git a/lib/threads/frontend.js b/lib/threads/frontend.js index 679913a..ead69ef 100644 --- a/lib/threads/frontend.js +++ b/lib/threads/frontend.js @@ -42,6 +42,7 @@ var eh = require('../errorhelper.js'); var redisClient = null; var started = false; +var stopping = false; var server = null; var sprintf = require('sprintf-js').sprintf; var getFolderSize; // Forward declaration. @@ -70,15 +71,12 @@ var port = config.frontend.port; var address = config.frontend.address; - var loop = false; redisClient.on('closed', function() { - if (!loop) { - loop = true; - console.error('Frontend connection to redis died, killing thread.', { - channel: 'frontend.error.fatal', - }); - stopServer(process.exit); - } + if (stopping) return; + console.error('Frontend connection to redis died, killing thread.', { + channel: 'frontend.error.fatal', + }); + stopServer(process.exit); }); redisClient.connect(); @@ -124,9 +122,17 @@ * @param callbackFunc Function to call when server successfully closed */ function stopServer(callbackFunc) { - redisClient.close(); + callbackFunc = callbackFunc || function() { }; + if (stopping) { return; } if (started) { - server.close(callbackFunc); + stopping = true; + console.info( + 'Frontend thread halting as requested.', + { channel: 'frontend' } + ); + server.close(function() { + redisClient.close(callbackFunc); + }); } else { console.debug('Frontend requested to stop, but it never started.', { channel: 'frontend', diff --git a/lib/threads/gc.js b/lib/threads/gc.js index 83b2ee9..65a9267 100644 --- a/lib/threads/gc.js +++ b/lib/threads/gc.js @@ -42,8 +42,10 @@ var Redis = require('../RedisWrapper.js'); var config = null; +var busy = 0; var running = false; -var intervalTimer = null; +var stopping = false; +var gcTimer = 0; var redisClient = null; /* === Public Exported Functions =========================================== */ @@ -76,7 +78,18 @@ } }); redisClient.on('opened', function() { - intervalTimer = setInterval(doGCRun, config.garbage_collection.every * 1000); + var doGC = function() { + gcTimer = 0; + if (!running) return; + return doGCRun().catch(function(err) { + console.error('GC failure: %s', err, { channel: 'gc' }); + }).then(function() { + if (stopping) return stopping(); + gcTimer = + setTimeout(doGC, config.garbage_collection.every * 1000); + }); + }; + gcTimer = setTimeout(doGC, 0); }); redisClient.connect(); } @@ -104,15 +117,24 @@ * @param callbackFunc Function to call when server successfully closed */ function stopThread(callbackFunc) { + if (busy) { + stopping = function() { stopThread(callbackFunc); }; + return; + } + console.info( + 'GC thread halting as requested.', { channel: 'gc' } + ); running = false; - redisClient.close(); - if (callbackFunc) { setTimeout(callbackFunc, 1); } + stopping = false; + if (gcTimer) { clearTimeout(gcTimer); gcTimer = 0; } + redisClient.close(callbackFunc); } /* ==== The meat === */ function doGCRun() { var startTime = Date.now(); + busy++; return Promise.resolve() .then(cleanExpiredJobStatusObjects) .then(cleanOutputDir) @@ -122,7 +144,7 @@ console.info('Finished GarbageCollection run in %s seconds', ((Date.now() - startTime) / 1000), { channel: 'gc' }); statsd.timing('gc.runtime', (Date.now() - startTime) / 1000); - }); + }).finally(function() { busy--; }); } /** diff --git a/mw-ocg-service.js b/mw-ocg-service.js index 655146e..9f99d66 100755 --- a/mw-ocg-service.js +++ b/mw-ocg-service.js @@ -32,6 +32,9 @@ * @file */ +require('core-js/shim'); +var Promise = require('prfun'); + var cli = require('./lib/cli.js'); var cluster = require('cluster'); var commander = require('commander'); @@ -72,22 +75,38 @@ respawnWorkers = false; console.info('Beginning graceful shutdown'); - for (var id in cluster.workers) { - console.info('Sending shutdown command to worker %s', id); - cluster.workers[id].kill('SIGINT'); - } - - var infoAndExit = function() { - var stillAlive = Object.keys(cluster.workers).length; - if (stillAlive > 0) { - console.info('Still awaiting death of %d workers', stillAlive); - setTimeout(infoAndExit, 1000); - } else { - console.info('All threads killed. Exiting.'); - process.exit(); - } + var shutdownSome = function(which) { + return new Promise(function(resolve, reject) { + var todie = 1, onexit = function(code, signal) { + if ((--todie) === 0) { resolve(); } + }, worker; + for (var id in cluster.workers) { + worker = cluster.workers[id]; + if (which && workerTypes[worker.process.pid] !== which) { + continue; + } + todie++; + console.info( + 'Sending shutdown command to worker %s (%s) [pid %d]', + worker.id, which || 'all', worker.process.pid + ); + worker.once('exit', onexit); + worker.kill('SIGINT'); + } + onexit(); + }); }; - infoAndExit(); + // Shutdown in this order: gc, backend, frontend. + // That way we keep answering frontend requests until the very + // last moment. + shutdownSome('gc') + .then(shutdownSome.bind(null, 'backend')) + .then(shutdownSome.bind(null, 'frontend')) + .then(shutdownSome.bind(null, null /* everything else */)) + .then(function() { + console.info('All threads killed. Exiting.'); + process.exit(0); + }).done(); }; var immediateShutdown = function immediateShutdown() { @@ -169,6 +188,7 @@ } process.on('SIGINT', function() { + if (!cluster.worker.suicide) { return; } // Master wants us to die :( console.debug('%s worker received SIGINT', process.env.OCG_SERVICE_CHILD_TYPE); child.stop(process.exit); -- To view, visit https://gerrit.wikimedia.org/r/284598 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id128e7ad3ba06d4da6302c2c42b8f2c32e58de9b Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/Collection/OfflineContentGenerator Gerrit-Branch: master Gerrit-Owner: Cscott <canan...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits