Bmansurov has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/392932 )
Change subject: Abort render on connection close ...................................................................... Abort render on connection close When the client closes the connection: - if the task is still waiting in the queue, remove it; - if the task has already started, abort render. Bug: T180604 Change-Id: I47f6847948ed8903c54fdaaf4fcb5ff021d46c76 --- M lib/queue.js M package.json M routes/html2pdf-v1.js M test/lib/queue.js 4 files changed, 134 insertions(+), 20 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/chromium-render refs/changes/32/392932/1 diff --git a/lib/queue.js b/lib/queue.js index 3330f79..360db88 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -2,8 +2,6 @@ const asyncQueue = require('async/queue'); const asyncTimeout = require('async/timeout'); -const Renderer = require('./renderer'); -const uuid = require('cassandra-uuid'); // Errors used as the first argument of the callback passed to the queue const callbackErrors = { @@ -38,15 +36,16 @@ * @param {Object} puppeteerOptions options used to in starting puppeteer * @param {Object} pdfOptions pdf options passed to Chromium * @param {Object} logger app logger + * @param {Renderer} renderer an instance of PDF renderer */ - constructor(queueOptions, puppeteerOptions, pdfOptions, logger) { + constructor(queueOptions, puppeteerOptions, pdfOptions, logger, renderer) { this._queueObject = asyncQueue(this._worker.bind(this), queueOptions.concurrency); this._puppeteerOptions = puppeteerOptions; this._pdfOptions = pdfOptions; this._options = queueOptions; this._logger = logger; - this._renderer = new Renderer(); + this._renderer = renderer; } /** @@ -81,14 +80,13 @@ const queue = this._queueObject; const timeout = this._options.queueTimeout; - data._id = `${uuid.TimeUuid.now().toString()}|${data.uri}`; data._timeoutID = setTimeout(() => { queue.remove((worker) => { - if (worker.data._id === data._id) { + if (worker.data.id === data.id) { logger.log( 'warn/queue', `Queue is still busy after waiting ` + - `for ${timeout} secs. Data ID: ${data._id}.` + `for ${timeout} secs. Data ID: ${data.id}.` ); callback(callbackErrors.queueBusy, null); return true; @@ -129,7 +127,7 @@ this._setCancelTaskTimeout(data, callback); const queueSize = this._countJobsInQueue(); this._logger.log( - 'debug/queue', `Job ${data._id} added to the queue. Jobs waiting: ${queueSize}` + 'debug/queue', `Job ${data.id} added to the queue. Jobs waiting: ${queueSize}` ); this._queueObject.push(data, callback); } @@ -145,7 +143,7 @@ */ _worker(data, callback) { this._clearCancelTaskTimeout(data); - this._logger.log('info/queue', `Started rendering ${data._id}`); + this._logger.log('info/queue', `Started rendering ${data.id}`); const timeout = this._options.executionTimeout; const timedRender = asyncTimeout(this._render.bind(this), @@ -157,7 +155,7 @@ this._logger.log( 'error/render', `Aborting. Render hasn't finished within ${timeout} ` + - `seconds. Data ID: ${data._id}.` + `seconds. Data ID: ${data.id}.` ); renderer.abortRender(); callback(callbackErrors.renderTimeout, null); @@ -178,18 +176,47 @@ this._pdfOptions) .then((pdf) => { this._logger.log( - 'debug/queue', `Job ${data._id} rendered successfully` + 'debug/queue', `Job ${data.id} rendered successfully` ); callback(null, pdf); }) .catch((error) => { this._logger.log('error/render', { - msg: `Cannot convert page ${data.uri} to PDF. Error: ${error.toString()}`, + msg: `Data ID: ${data.id} to PDF. ${error.toString()}`, error }); callback(callbackErrors.renderFailed, null); }); } + + /** + * Abort task identified by `id` + * @param {string} id ID initially passed as part of data + */ + abort(id) { + let taskStarted = true; + + // has the task started already? + this._queueObject.remove((worker) => { + if (worker.data.id === id) { + this._logger.log( + 'debug/queue', + `Removing task from the queue. Data ID: ${id}.` + ); + taskStarted = false; + return true; + } + return false; + }); + + if (taskStarted) { + this._logger.log( + 'debug/render', + `Aborting render. Data ID: ${id}.` + ); + this._renderer.abortRender(); + } + } } module.exports = { diff --git a/package.json b/package.json index c2dd183..9f2449e 100644 --- a/package.json +++ b/package.json @@ -37,13 +37,13 @@ "compression": "^1.7.1", "domino": "^1.0.30", "express": "^4.16.2", + "http-shutdown": "^1.2.0", "js-yaml": "^3.10.0", "preq": "^0.5.3", "puppeteer": "^0.11.0", "service-runner": "^2.4.2", "swagger-router": "^0.7.1", - "swagger-ui": "git+https://github.com/wikimedia/swagger-ui#master", - "http-shutdown": "^1.2.0" + "swagger-ui": "git+https://github.com/wikimedia/swagger-ui#master" }, "devDependencies": { "extend": "^3.0.1", diff --git a/routes/html2pdf-v1.js b/routes/html2pdf-v1.js index 923d25b..41625a3 100644 --- a/routes/html2pdf-v1.js +++ b/routes/html2pdf-v1.js @@ -2,6 +2,8 @@ const { callbackErrors, Queue } = require('../lib/queue'); const sUtil = require('../lib/util'); +const uuid = require('cassandra-uuid'); +const Renderer = require('../lib/renderer'); /** * The main router object @@ -26,7 +28,9 @@ } }); + const id = `${uuid.TimeUuid.now().toString()}|${restbaseRequest.uri}`; app.queue.push({ + id, uri: restbaseRequest.uri, format: req.params.format }, ((error, pdf) => { @@ -57,12 +61,22 @@ res.writeHead(200, headers); res.end(pdf, 'binary'); })); + + req.on('close', () => { + app.logger.log( + 'debug/request', + `Connection closed by the client. ` + + `Will try and cancel the task with ID ${id}.` + ); + app.queue.abort(id); + }); }); module.exports = function(appObj) { app = appObj; const conf = app.conf; + app.queue = new Queue( { concurrency: conf.render_concurrency, @@ -72,7 +86,8 @@ }, conf.puppeteer_options, conf.pdf_options, - app.logger + app.logger, + new Renderer() ); // the returned object mounts the routes on diff --git a/test/lib/queue.js b/test/lib/queue.js index 58a63af..f4d8ed3 100644 --- a/test/lib/queue.js +++ b/test/lib/queue.js @@ -3,6 +3,7 @@ const assert = require('../utils/assert.js'); const { callbackErrors, Queue } = require('../../lib/queue'); const logger = { log: () => {} }; +const renderer = { abortRender: () => {} }; const puppeteerFlags = [ '--no-sandbox', '--disable-setuid-sandbox' @@ -23,7 +24,7 @@ } }; -describe('concurrency', function() { +describe('Queue', function() { this.timeout(5000); it('should run only one worker at a time', function(done) { @@ -45,7 +46,7 @@ queueTimeout: 90, executionTimeout: 90, maxTaskCount: 3 - }, puppeteerFlags, pdfOptions, logger); + }, puppeteerFlags, pdfOptions, logger, renderer); // first worker must finish after 1 sec q.push({ @@ -93,7 +94,7 @@ queueTimeout: 5, executionTimeout: 90, maxTaskCount: 1 - }, puppeteerFlags, pdfOptions, logger); + }, puppeteerFlags, pdfOptions, logger, renderer); // first worker completes in 3 seconds q.push({ @@ -135,7 +136,7 @@ queueTimeout: 1, executionTimeout: 90, maxTaskCount: 3 - }, puppeteerFlags, pdfOptions, logger); + }, puppeteerFlags, pdfOptions, logger, renderer); // first worker completes in 3 seconds q.push({ @@ -180,7 +181,7 @@ queueTimeout: 30, executionTimeout: 1, maxTaskCount: 10 - }, puppeteerFlags, pdfOptions, logger); + }, puppeteerFlags, pdfOptions, logger, renderer); q.push({ id: 1, @@ -191,4 +192,75 @@ done(); }); }); + + it('should remove task from queue when task is aborted', function(done) { + class QueueTest extends Queue { + _render(data, callback) { + // simulate render + setTimeout(() => { + callback(null, {}); + }, data.timeout); + } + } + const q = new QueueTest({ + concurrency: 1, + queueTimeout: 30, + executionTimeout: 10, + maxTaskCount: 10 + }, puppeteerFlags, pdfOptions, logger, renderer); + + q.push({ + id: 1, + timeout: 1000 + }, (error, data) => { + assert.ok(error === null, + 'Render finished.'); + assert.ok(q._countJobsInQueue() === 0, + 'Queue is empty.'); + done(); + }); + + q.push({ + id: 2, + timeout: 500 + }, (error, data) => { + assert(false, 'Callback should never be called as the job has ' + + 'been removed from the queue.'); + }); + q.abort(2); + }); + + it('should abort render when task is aborted', function(done) { + class QueueTest extends Queue { + _render(data, callback) { + // simulate render + setTimeout(() => { + callback(null, {}); + }, data.timeout); + } + } + const q = new QueueTest({ + concurrency: 1, + queueTimeout: 30, + executionTimeout: 10, + maxTaskCount: 10 + }, puppeteerFlags, pdfOptions, logger, { + abortRender: () => { + assert.ok(true, 'Renderer abort is called.'); + done(); + } + }); + + q.push({ + id: 1, + timeout: 5000 + }, (error, data) => { + }); + + // wait a little for the task to start + setTimeout(() => { + q.abort(1); + }, 20); + }); + }); -- To view, visit https://gerrit.wikimedia.org/r/392932 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I47f6847948ed8903c54fdaaf4fcb5ff021d46c76 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/services/chromium-render Gerrit-Branch: master Gerrit-Owner: Bmansurov <bmansu...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits