DCausse has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/395000 )
Change subject: Use restify instead of lowlevel sockets
......................................................................
Use restify instead of lowlevel sockets
Probably overkill but dealing with request boudaries seemed
a bit hard to do when working directly on sockets.
Also I'm wondering if could not use the filesystem as a locking
mechanism rather than creating a child process.
Not entirely sure but using restify I'm unable to keep the
process running onComplete runs leading to errors preventing the
unix socket to be closed (added a hack to unlink it).
Change-Id: Ic0473887c4eb10d3dca3b7cfafabf3ab31654b1d
---
M package.json
M tests/integration/config/wdio.conf.js
M tests/integration/features/support/world.js
M tests/integration/lib/tracker.js
4 files changed, 99 insertions(+), 147 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/CirrusSearch
refs/changes/00/395000/1
diff --git a/package.json b/package.json
index aabf898..cf4a6b3 100644
--- a/package.json
+++ b/package.json
@@ -20,6 +20,9 @@
"stylelint": "7.8.0",
"stylelint-config-wikimedia": "0.4.1",
"wdio-cucumber-framework": "^1.0.1",
- "webdriverio": "^4.8.0"
+ "webdriverio": "^4.8.0",
+ "restify": "^6.3.4",
+ "request": "^2.83.0",
+ "request-promise-native": "^1.0.5"
}
}
diff --git a/tests/integration/config/wdio.conf.js
b/tests/integration/config/wdio.conf.js
index 1af44b6..3f537ef 100644
--- a/tests/integration/config/wdio.conf.js
+++ b/tests/integration/config/wdio.conf.js
@@ -7,13 +7,15 @@
'use strict';
const child_process = require( 'child_process' ),
- path = require( 'path' );
+ path = require( 'path' ),
+ fs = require('fs');
function relPath( foo ) {
return path.resolve( __dirname, '../..', foo );
}
-var forkedTracker;
+let forkedTracker;
+let unixSocket;
exports.config = {
@@ -217,7 +219,7 @@
// Gets executed once before all workers get launched.
onPrepare: function ( config ) {
forkedTracker = child_process.fork( relPath(
'./integration/lib/tracker.js' ) );
- forkedTracker.send( { config: config } );
+ unixSocket = config.trackerPath;
return new Promise( ( resolve, reject ) => {
forkedTracker.on( 'message', ( msg ) => {
if ( msg.initialized ) {
@@ -226,6 +228,7 @@
reject( msg.error );
}
} );
+ forkedTracker.send( { config: config } );
} );
},
//
@@ -288,6 +291,12 @@
// possible to defer the end of the process using a promise.
onComplete: function() {
// TODO: Is this method being called a guarantee, or should we
handle signals to be sure?
- forkedTracker.send( { exit: true } );
+ try {
+ forkedTracker.send({exit: true});
+ } catch (err) {
+ console.log( `Failed to send exit signal to tracker:
${err}`);
+ // Force unlinking the socket
+ fs.unlinkSync(unixSocket);
+ }
}
};
diff --git a/tests/integration/features/support/world.js
b/tests/integration/features/support/world.js
index 56c12bb..7b95499 100644
--- a/tests/integration/features/support/world.js
+++ b/tests/integration/features/support/world.js
@@ -11,7 +11,7 @@
* keeps a user/login state).
*/
const {defineSupportCode} = require( 'cucumber' ),
- net = require( 'net' ),
+ request = require('request-promise-native'),
log = require( 'semlog' ).log,
Bot = require( 'mwbot' ),
StepHelpers = require( '../step_definitions/page_step_helpers' ),
@@ -24,29 +24,8 @@
class TagClient {
constructor( options ) {
this.tags = {};
- this.connection = new net.Socket();
- this.connection.connect( options.trackerPath );
- this.nextRequestId = 0;
- this.pendingResponses = {};
+ this.unixSocketPath = options.trackerPath;
this.silentLog = options.logLevel !== 'verbose';
- this.connection.on( 'data', ( data ) => {
- let parsed = JSON.parse( data );
- log( `[D] TAG << ${parsed.requestId}: ${data}`,
this.silentLog );
- if ( parsed && this.pendingResponses[parsed.requestId]
) {
- this.pendingResponses[parsed.requestId]( parsed
);
- delete this.pendingResponses[parsed.requestId];
- }
- } );
- }
-
- request( req ) {
- req.requestId = this.nextRequestId++;
- return new Promise( ( resolve ) => {
- let data = JSON.stringify( req );
- log( `[D] TAG >> ${data}`, this.silentLog );
- this.pendingResponses[req.requestId] = resolve;
- this.connection.write( data );
- } );
}
check( tag ) {
@@ -54,9 +33,9 @@
if ( this.tags[tag] ) {
return this.tags[tag];
}
- let response = yield this.request( {
- check: tag
- } );
+ log( `[D] TAG >> ${tag}`, this.silentLog );
+ let response = yield this.post( { check: tag } );
+ log( `[D] TAG << ${tag}`, this.silentLog );
if ( response.status === 'complete' || response.status
=== 'reject' ) {
this.tags[tag] = response.status;
}
@@ -66,15 +45,18 @@
reject( tag ) {
this.tags[tag] = 'reject';
- return this.request( {
- reject: tag
- } );
+ return this.post( { reject: tag } );
}
complete( tag ) {
this.tags[tag] = 'complete';
- return this.request( {
- complete: tag
+ return this.post( { complete: tag } );
+ }
+
+ post( data ) {
+ return request.post( {
+ uri: `http://unix:${this.unixSocketPath}:/tracker`,
+ json: data
} );
}
}
diff --git a/tests/integration/lib/tracker.js b/tests/integration/lib/tracker.js
index ab95c5d..970e887 100644
--- a/tests/integration/lib/tracker.js
+++ b/tests/integration/lib/tracker.js
@@ -1,129 +1,77 @@
/*jshint esversion: 6, node:true */
-/*global console */
+const restify = require('restify'),
+ Promise = require( 'bluebird' ); // jshint ignore:line
-const fs = require( 'fs' ),
- net = require( 'net' );
-
-// Somewhat generic unix socket server. Is there perhaps a package
-// we could source this from instead?
class Server {
- constructor( path ) {
- if ( !this.dispatch ) {
- this.ready = Promise.reject( 'Dispatch not implemented'
);
- return;
- }
+ constructor(options) {
+ this.server = restify.createServer({
+ name: 'tracker',
+ version: '1.0.0',
+ });
- console.log( `Starting server to listen on ${path}` );
- this.ready = new Promise( ( resolve, reject ) => {
- this.unixServer = net.createServer( ( c ) =>
this.onClient( c ) );
- this.unixServer.on( 'error',
this.onInitializationError( reject ) );
- this.unixServer.on( 'listening', this.onListening(
resolve ) );
- this.unixServer.listen( path );
- } );
- }
+ this.unixSocketPath = options.trackerPath;
- onInitializationError( reject ) {
- return ( e ) => {
- console.log( 'Server initialization failed' );
- if ( e.code === 'EADDRINUSE' ) {
- // prevent unlinking the file we never took
ownership of
- this.unixServer.unref();
- this.unixServer = undefined;
- }
- reject( e.code );
+
this.server.use(restify.plugins.acceptParser(this.server.acceptable));
+ this.server.use(restify.plugins.queryParser());
+ this.server.use(restify.plugins.bodyParser());
+
+ const globals = {
+ tags: {},
+ pending: {},
+ resolvers: {},
};
- }
- onListening( resolve ) {
- return () => {
- console.log( 'Server initialized' );
- this.unixServer.on( 'close', this.shutdown );
- // TODO: Are these needed? Or is the 'close' listener
enough?
- process.on( 'SIGTERM', this.shutdown );
- process.on( 'SIGINT', this.shutdown );
- // TODO: Do we need another handler for this?
- this.unixServer.removeListener( 'error',
this.onInitializationError );
- resolve();
- };
- }
+ this.server.post('/tracker', function (req, res, next) {
+ let data = req.body;
- onClient( socket ) {
- socket.on('data', ( data ) => {
- var parsed;
- try {
- parsed = JSON.parse( data );
- } catch ( e ) {
- // Invalid JSON. Ignore? May lead to
- // timeouts on the other end...
- return;
- }
- this.dispatch( parsed ).then( ( response ) => {
- if ( response ) {
- response.requestId = parsed.requestId;
- socket.write( JSON.stringify( response
) );
- }
- } );
- } );
- }
-
- shutdown() {
- console.log( 'Running server shutdown' );
- if ( this.unixServer ) {
- console.log( 'cleaning up' );
- this.unixServer.unref();
- fs.unlinkSync( this.unixServer.address() );
- delete this.unixServer;
- }
- }
-}
-
-// Communication protocol with clients
-class TagTrackerServer extends Server {
- constructor( path ) {
- super( path );
- this.tags = {};
- this.pending = {};
- this.resolvers = {};
- }
-
- dispatch( data ) {
- return new Promise( ( resolve, reject ) => {
- if ( this.resolvers[data.complete] ) {
+ if (globals.resolvers[data.complete]) {
// tag completed, resolve pending
- this.resolvers[data.complete]( {
+ globals.resolvers[data.complete]({
tag: data.complete,
status: 'complete'
- } );
- // Just echo it back. Not used for anything.
- resolve( data );
- } else if ( this.resolvers[data.reject] ) {
- this.resolvers[data.reject]( {
+ });
+ res.send(data);
+ return next();
+ }
+
+ if (globals.resolvers[data.reject]) {
+ globals.resolvers[data.reject]({
tag: data.reject,
status: 'reject'
- } );
- resolve( data );
- } else if ( this.pending[data.check] ) {
- // Another process is initializing this tag.
Wait for it
- // to signal completion
- this.pending[data.check].then( resolve );
- } else if ( data.check ) {
- // New tag
- this.pending[data.check] = new Promise( (
resolve ) => {
- this.resolvers[data.check] = resolve;
- } );
- resolve( {
+ });
+ res.send(data);
+ return next();
+ }
+
+ if (globals.pending[data.check]) {
+ globals.pending[data.check].then(function
(data) {
+ res.send(data);
+ next();
+ });
+ } else if (data.check) {
+ globals.pending[data.check] = new
Promise((resolve) => {
+ globals.resolvers[data.check] = resolve;
+ });
+ res.send({
tag: data.check,
status: 'new'
- } );
+ });
+ return next();
} else {
- console.log( 'Unrecognized tag server request:
', data );
- reject();
+ return next(new Error('Unrecognized tag server
request: ' + JSON.stringify(data)));
}
- } );
+ });
+ }
+
+ close() {
+ this.server.close();
+ }
+
+ start(success) {
+ this.server.listen(this.unixSocketPath, success);
}
}
-// Communication protocol with host process
(() => {
var server;
process.on( 'message', ( msg ) => {
@@ -131,14 +79,24 @@
if ( server ) {
process.send( { error: "Already initialized" }
);
} else {
- server = new TagTrackerServer(
msg.config.trackerPath );
- server.ready.then(
- () => process.send( { initialized:
true } ),
- ( e ) => process.send( { error: e } ) );
+ server = new Server( msg.config );
+ server.server.on( 'error', (err) => {
+ process.send( { error: err.message } );
+ server = undefined;
+ } );
+
+ server.start(
+ () => {
+ console.log( 'Server
initialized' );
+ process.send( {initialized:
true} );
+ }
+ );
}
}
+ // TODO: figure out why the process channel is closed when
cucumber tries to send
+ // the exit signal...
if ( msg.exit && server ) {
- server.shutdown();
+ server.close();
}
} );
} )();
--
To view, visit https://gerrit.wikimedia.org/r/395000
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic0473887c4eb10d3dca3b7cfafabf3ab31654b1d
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/CirrusSearch
Gerrit-Branch: master
Gerrit-Owner: DCausse <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits