Bmansurov has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/392932 )

Change subject: Abort render on connection close
......................................................................

Abort render on connection close

When the client closes the connection:
- if the task is still waiting in the queue, remove it;
- if the task has already started, abort render.

Bug: T180604
Change-Id: I47f6847948ed8903c54fdaaf4fcb5ff021d46c76
---
M lib/queue.js
M package.json
M routes/html2pdf-v1.js
M test/lib/queue.js
4 files changed, 134 insertions(+), 20 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/chromium-render 
refs/changes/32/392932/1

diff --git a/lib/queue.js b/lib/queue.js
index 3330f79..360db88 100644
--- a/lib/queue.js
+++ b/lib/queue.js
@@ -2,8 +2,6 @@
 
 const asyncQueue = require('async/queue');
 const asyncTimeout = require('async/timeout');
-const Renderer = require('./renderer');
-const uuid = require('cassandra-uuid');
 
 // Errors used as the first argument of the callback passed to the queue
 const callbackErrors = {
@@ -38,15 +36,16 @@
      * @param {Object} puppeteerOptions options used to in starting puppeteer
      * @param {Object} pdfOptions pdf options passed to Chromium
      * @param {Object} logger app logger
+     * @param {Renderer} renderer an instance of PDF renderer
      */
-    constructor(queueOptions, puppeteerOptions, pdfOptions, logger) {
+    constructor(queueOptions, puppeteerOptions, pdfOptions, logger, renderer) {
         this._queueObject = asyncQueue(this._worker.bind(this),
                                        queueOptions.concurrency);
         this._puppeteerOptions = puppeteerOptions;
         this._pdfOptions = pdfOptions;
         this._options = queueOptions;
         this._logger = logger;
-        this._renderer = new Renderer();
+        this._renderer = renderer;
     }
 
     /**
@@ -81,14 +80,13 @@
         const queue = this._queueObject;
         const timeout = this._options.queueTimeout;
 
-        data._id = `${uuid.TimeUuid.now().toString()}|${data.uri}`;
         data._timeoutID = setTimeout(() => {
             queue.remove((worker) => {
-                if (worker.data._id === data._id) {
+                if (worker.data.id === data.id) {
                     logger.log(
                         'warn/queue',
                         `Queue is still busy after waiting ` +
-                            `for ${timeout} secs. Data ID: ${data._id}.`
+                            `for ${timeout} secs. Data ID: ${data.id}.`
                     );
                     callback(callbackErrors.queueBusy, null);
                     return true;
@@ -129,7 +127,7 @@
         this._setCancelTaskTimeout(data, callback);
         const queueSize = this._countJobsInQueue();
         this._logger.log(
-            'debug/queue', `Job ${data._id} added to the queue. Jobs waiting: 
${queueSize}`
+            'debug/queue', `Job ${data.id} added to the queue. Jobs waiting: 
${queueSize}`
         );
         this._queueObject.push(data, callback);
     }
@@ -145,7 +143,7 @@
      */
     _worker(data, callback) {
         this._clearCancelTaskTimeout(data);
-        this._logger.log('info/queue', `Started rendering ${data._id}`);
+        this._logger.log('info/queue', `Started rendering ${data.id}`);
 
         const timeout = this._options.executionTimeout;
         const timedRender = asyncTimeout(this._render.bind(this),
@@ -157,7 +155,7 @@
                 this._logger.log(
                     'error/render',
                     `Aborting. Render hasn't finished within ${timeout} ` +
-                        `seconds. Data ID: ${data._id}.`
+                        `seconds. Data ID: ${data.id}.`
                 );
                 renderer.abortRender();
                 callback(callbackErrors.renderTimeout, null);
@@ -178,18 +176,47 @@
                           this._pdfOptions)
             .then((pdf) => {
                 this._logger.log(
-                    'debug/queue', `Job ${data._id} rendered successfully`
+                    'debug/queue', `Job ${data.id} rendered successfully`
                 );
                 callback(null, pdf);
             })
             .catch((error) => {
                 this._logger.log('error/render', {
-                    msg: `Cannot convert page ${data.uri} to PDF. Error: 
${error.toString()}`,
+                    msg: `Data ID: ${data.id} to PDF. ${error.toString()}`,
                     error
                 });
                 callback(callbackErrors.renderFailed, null);
             });
     }
+
+    /**
+     * Abort task identified by `id`
+     * @param {string} id ID initially passed as part of data
+     */
+    abort(id) {
+        let taskStarted = true;
+
+        // has the task started already?
+        this._queueObject.remove((worker) => {
+            if (worker.data.id === id) {
+                this._logger.log(
+                    'debug/queue',
+                    `Removing task from the queue. Data ID: ${id}.`
+                );
+                taskStarted = false;
+                return true;
+            }
+            return false;
+        });
+
+        if (taskStarted) {
+            this._logger.log(
+                'debug/render',
+                `Aborting render. Data ID: ${id}.`
+            );
+            this._renderer.abortRender();
+        }
+    }
 }
 
 module.exports = {
diff --git a/package.json b/package.json
index c2dd183..9f2449e 100644
--- a/package.json
+++ b/package.json
@@ -37,13 +37,13 @@
     "compression": "^1.7.1",
     "domino": "^1.0.30",
     "express": "^4.16.2",
+    "http-shutdown": "^1.2.0",
     "js-yaml": "^3.10.0",
     "preq": "^0.5.3",
     "puppeteer": "^0.11.0",
     "service-runner": "^2.4.2",
     "swagger-router": "^0.7.1",
-    "swagger-ui": "git+https://github.com/wikimedia/swagger-ui#master";,
-    "http-shutdown": "^1.2.0"
+    "swagger-ui": "git+https://github.com/wikimedia/swagger-ui#master";
   },
   "devDependencies": {
     "extend": "^3.0.1",
diff --git a/routes/html2pdf-v1.js b/routes/html2pdf-v1.js
index 923d25b..41625a3 100644
--- a/routes/html2pdf-v1.js
+++ b/routes/html2pdf-v1.js
@@ -2,6 +2,8 @@
 
 const { callbackErrors, Queue } = require('../lib/queue');
 const sUtil = require('../lib/util');
+const uuid = require('cassandra-uuid');
+const Renderer = require('../lib/renderer');
 
 /**
  * The main router object
@@ -26,7 +28,9 @@
         }
     });
 
+    const id = `${uuid.TimeUuid.now().toString()}|${restbaseRequest.uri}`;
     app.queue.push({
+        id,
         uri: restbaseRequest.uri,
         format: req.params.format
     }, ((error, pdf) => {
@@ -57,12 +61,22 @@
         res.writeHead(200, headers);
         res.end(pdf, 'binary');
     }));
+
+    req.on('close', () => {
+        app.logger.log(
+            'debug/request',
+            `Connection closed by the client. ` +
+            `Will try and cancel the task with ID ${id}.`
+        );
+        app.queue.abort(id);
+    });
 });
 
 module.exports = function(appObj) {
     app = appObj;
 
     const conf = app.conf;
+
     app.queue = new Queue(
         {
             concurrency: conf.render_concurrency,
@@ -72,7 +86,8 @@
         },
         conf.puppeteer_options,
         conf.pdf_options,
-        app.logger
+        app.logger,
+        new Renderer()
     );
 
     // the returned object mounts the routes on
diff --git a/test/lib/queue.js b/test/lib/queue.js
index 58a63af..f4d8ed3 100644
--- a/test/lib/queue.js
+++ b/test/lib/queue.js
@@ -3,6 +3,7 @@
 const assert = require('../utils/assert.js');
 const { callbackErrors, Queue } = require('../../lib/queue');
 const logger = { log: () => {} };
+const renderer = { abortRender: () => {} };
 const puppeteerFlags = [
     '--no-sandbox',
     '--disable-setuid-sandbox'
@@ -23,7 +24,7 @@
     }
 };
 
-describe('concurrency', function() {
+describe('Queue', function() {
     this.timeout(5000);
 
     it('should run only one worker at a time', function(done) {
@@ -45,7 +46,7 @@
             queueTimeout: 90,
             executionTimeout: 90,
             maxTaskCount: 3
-        }, puppeteerFlags, pdfOptions, logger);
+        }, puppeteerFlags, pdfOptions, logger, renderer);
 
         // first worker must finish after 1 sec
         q.push({
@@ -93,7 +94,7 @@
             queueTimeout: 5,
             executionTimeout: 90,
             maxTaskCount: 1
-        }, puppeteerFlags, pdfOptions, logger);
+        }, puppeteerFlags, pdfOptions, logger, renderer);
 
         // first worker completes in 3 seconds
         q.push({
@@ -135,7 +136,7 @@
             queueTimeout: 1,
             executionTimeout: 90,
             maxTaskCount: 3
-        }, puppeteerFlags, pdfOptions, logger);
+        }, puppeteerFlags, pdfOptions, logger, renderer);
 
         // first worker completes in 3 seconds
         q.push({
@@ -180,7 +181,7 @@
             queueTimeout: 30,
             executionTimeout: 1,
             maxTaskCount: 10
-        }, puppeteerFlags, pdfOptions, logger);
+        }, puppeteerFlags, pdfOptions, logger, renderer);
 
         q.push({
             id: 1,
@@ -191,4 +192,75 @@
             done();
         });
     });
+
+    it('should remove task from queue when task is aborted', function(done) {
+        class QueueTest extends Queue {
+            _render(data, callback) {
+                // simulate render
+                setTimeout(() => {
+                    callback(null, {});
+                }, data.timeout);
+            }
+        }
+        const q = new QueueTest({
+            concurrency: 1,
+            queueTimeout: 30,
+            executionTimeout: 10,
+            maxTaskCount: 10
+        }, puppeteerFlags, pdfOptions, logger, renderer);
+
+        q.push({
+            id: 1,
+            timeout: 1000
+        }, (error, data) => {
+            assert.ok(error === null,
+                      'Render finished.');
+            assert.ok(q._countJobsInQueue() === 0,
+                      'Queue is empty.');
+            done();
+        });
+
+        q.push({
+            id: 2,
+            timeout: 500
+        }, (error, data) => {
+            assert(false, 'Callback should never be called as the job has ' +
+                   'been removed from the queue.');
+        });
+        q.abort(2);
+    });
+
+    it('should abort render when task is aborted', function(done) {
+        class QueueTest extends Queue {
+            _render(data, callback) {
+                // simulate render
+                setTimeout(() => {
+                    callback(null, {});
+                }, data.timeout);
+            }
+        }
+        const q = new QueueTest({
+            concurrency: 1,
+            queueTimeout: 30,
+            executionTimeout: 10,
+            maxTaskCount: 10
+        }, puppeteerFlags, pdfOptions, logger, {
+            abortRender: () => {
+                assert.ok(true, 'Renderer abort is called.');
+                done();
+            }
+        });
+
+        q.push({
+            id: 1,
+            timeout: 5000
+        }, (error, data) => {
+        });
+
+        // wait a little for the task to start
+        setTimeout(() => {
+            q.abort(1);
+        }, 20);
+    });
+
 });

-- 
To view, visit https://gerrit.wikimedia.org/r/392932
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I47f6847948ed8903c54fdaaf4fcb5ff021d46c76
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/chromium-render
Gerrit-Branch: master
Gerrit-Owner: Bmansurov <bmansu...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to