Repository: ignite Updated Branches: refs/heads/master 08fff5e71 -> 76daf05fe
IGNITE-9808 Web Console: Refactored sockets caching. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/76daf05f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/76daf05f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/76daf05f Branch: refs/heads/master Commit: 76daf05fef1392bb77c3be77c799df28dbaea7a1 Parents: 08fff5e Author: Andrey Novikov <[email protected]> Authored: Mon Oct 8 10:00:43 2018 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Mon Oct 8 10:00:43 2018 +0700 ---------------------------------------------------------------------- modules/web-console/backend/app/agentSocket.js | 20 ++- .../web-console/backend/app/agentsHandler.js | 142 ++++++++----------- .../web-console/backend/app/browsersHandler.js | 51 +++---- modules/web-console/backend/app/configure.js | 22 +-- modules/web-console/backend/app/settings.js | 3 +- 5 files changed, 113 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/agentSocket.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/agentSocket.js b/modules/web-console/backend/app/agentSocket.js index aff62c4..0c7e6b2 100644 --- a/modules/web-console/backend/app/agentSocket.js +++ b/modules/web-console/backend/app/agentSocket.js @@ -38,21 +38,33 @@ module.exports.factory = function() { class AgentSocket { /** * @param {Socket} socket Socket for interaction. + * @param {Object} accounts Active accounts. * @param {Array.<String>} tokens Agent tokens. * @param {String} demoEnabled Demo enabled. */ - constructor(socket, tokens, demoEnabled) { + constructor(socket, accounts, tokens, demoEnabled) { Object.assign(this, { - socket, - tokens, + accounts, cluster: null, demo: { enabled: demoEnabled, browserSockets: [] - } + }, + socket, + tokens }); } + resetToken(oldToken) { + _.pull(this.tokens, oldToken); + + this.emitEvent('agent:reset:token', oldToken) + .then(() => { + if (_.isEmpty(this.tokens) && this.socket.connected) + this.socket.close(); + }); + } + /** * Send event to agent. * http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/agentsHandler.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/agentsHandler.js b/modules/web-console/backend/app/agentsHandler.js index 8d72ded..fd55ac3 100644 --- a/modules/web-console/backend/app/agentsHandler.js +++ b/modules/web-console/backend/app/agentsHandler.js @@ -50,22 +50,22 @@ module.exports.factory = function(settings, mongo, AgentSocket) { this.sockets = new Map(); } - get(token) { - let sockets = this.sockets.get(token); + get(account) { + let sockets = this.sockets.get(account._id.toString()); if (_.isEmpty(sockets)) - this.sockets.set(token, sockets = []); + this.sockets.set(account._id.toString(), sockets = []); return sockets; } /** * @param {AgentSocket} sock - * @param {String} token + * @param {String} account * @return {Array.<AgentSocket>} */ - add(token, sock) { - const sockets = this.get(token); + add(account, sock) { + const sockets = this.get(account); sockets.push(sock); } @@ -75,9 +75,9 @@ module.exports.factory = function(settings, mongo, AgentSocket) { * @return {AgentSocket} */ find(browserSocket) { - const token = browserSocket.request.user.token; + const {_id} = browserSocket.request.user; - const sockets = this.sockets.get(token); + const sockets = this.sockets.get(_id); return _.find(sockets, (sock) => _.includes(sock.demo.browserSockets, browserSocket)); } @@ -228,19 +228,26 @@ module.exports.factory = function(settings, mongo, AgentSocket) { * Link agent with browsers by account. * * @param {Socket} sock + * @param {Array.<mongo.Account>} accounts * @param {Array.<String>} tokens * @param {boolean} demoEnabled * * @private */ - onConnect(sock, tokens, demoEnabled) { - const agentSocket = new AgentSocket(sock, tokens, demoEnabled); + onConnect(sock, accounts, tokens, demoEnabled) { + const agentSocket = new AgentSocket(sock, accounts, tokens, demoEnabled); + + _.forEach(accounts, (account) => { + this._agentSockets.add(account, agentSocket); + + this._browsersHnd.agentStats(account); + }); sock.on('disconnect', () => { - _.forEach(tokens, (token) => { - _.pull(this._agentSockets.get(token), agentSocket); + _.forEach(accounts, (account) => { + _.pull(this._agentSockets.get(account), agentSocket); - this._browsersHnd.agentStats(token); + this._browsersHnd.agentStats(account); }); }); @@ -258,8 +265,8 @@ module.exports.factory = function(settings, mongo, AgentSocket) { if (agentSocket.cluster !== cluster) { agentSocket.cluster = cluster; - _.forEach(tokens, (token) => { - this._browsersHnd.agentStats(token); + _.forEach(accounts, (account) => { + this._browsersHnd.agentStats(account); }); } else { @@ -268,8 +275,8 @@ module.exports.factory = function(settings, mongo, AgentSocket) { if (changed) { cluster.update(top); - _.forEach(tokens, (token) => { - this._browsersHnd.clusterChanged(token, cluster); + _.forEach(accounts, (account) => { + this._browsersHnd.clusterChanged(account, cluster); }); } } @@ -282,16 +289,17 @@ module.exports.factory = function(settings, mongo, AgentSocket) { agentSocket.cluster = null; - _.forEach(tokens, (token) => { - this._browsersHnd.agentStats(token); + _.forEach(accounts, (account) => { + this._browsersHnd.agentStats(account); }); }); - _.forEach(tokens, (token) => { - this._agentSockets.add(token, agentSocket); + return agentSocket; + } - this._browsersHnd.agentStats(token); - }); + getAccounts(tokens) { + return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec() + .then((accounts) => ({accounts, activeTokens: _.uniq(_.map(accounts, 'token'))})); } /** @@ -301,17 +309,27 @@ module.exports.factory = function(settings, mongo, AgentSocket) { attach(srv, browsersHnd) { this._browsersHnd = browsersHnd; - if (this.io) - throw 'Agent server already started!'; - this._collectSupportedAgents() .then((supportedAgents) => { this.currentAgent = _.get(supportedAgents, 'current'); + if (this.io) + throw 'Agent server already started!'; + this.io = socketio(srv, {path: '/agents'}); this.io.on('connection', (sock) => { + const sockId = sock.id; + + console.log('Connected agent with socketId: ', sockId); + + sock.on('disconnect', (reason) => { + console.log(`Agent disconnected with [socketId=${sockId}, reason=${reason}]`); + }); + sock.on('agent:auth', ({ver, bt, tokens, disableDemo} = {}, cb) => { + console.log(`Received authentication request [socketId=${sockId}, tokens=${tokens}, ver=${ver}].`); + if (_.isEmpty(tokens)) return cb('Tokens not set. Please reload agent archive or check settings'); @@ -322,32 +340,33 @@ module.exports.factory = function(settings, mongo, AgentSocket) { return cb('You are using an older version of the agent. Please reload agent'); } - return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec() - .then((accounts) => { - const activeTokens = _.uniq(_.map(accounts, 'token')); - + return this.getAccounts(tokens) + .then(({accounts, activeTokens}) => { if (_.isEmpty(activeTokens)) return cb(`Failed to authenticate with token(s): ${tokens.join(',')}. Please reload agent archive or check settings`); cb(null, activeTokens); - return this.onConnect(sock, activeTokens, disableDemo); + return this.onConnect(sock, accounts, activeTokens, disableDemo); }) // TODO IGNITE-1379 send error to web master. .catch(() => cb(`Invalid token(s): ${tokens.join(',')}. Please reload agent archive or check settings`)); }); }); + }) + .catch(() => { + console.log('Failed to collect supported agents'); }); } - agent(token, demo, clusterId) { + agent(account, demo, clusterId) { if (!this.io) return Promise.reject(new Error('Agent server not started yet!')); - const socks = this._agentSockets.get(token); + const socks = this._agentSockets.get(account); if (_.isEmpty(socks)) - return Promise.reject(new Error('Failed to find connected agent for this token')); + return Promise.reject(new Error('Failed to find connected agent for this account')); if (demo || _.isNil(clusterId)) return Promise.resolve(_.head(socks)); @@ -360,11 +379,11 @@ module.exports.factory = function(settings, mongo, AgentSocket) { return Promise.resolve(sock); } - agents(token) { + agents(account) { if (!this.io) return Promise.reject(new Error('Agent server not started yet!')); - const socks = this._agentSockets.get(token); + const socks = this._agentSockets.get(account); if (_.isEmpty(socks)) return Promise.reject(new Error('Failed to find connected agent for this token')); @@ -372,61 +391,18 @@ module.exports.factory = function(settings, mongo, AgentSocket) { return Promise.resolve(socks); } - tryStopDemo(browserSocket) { - const agentSock = this._agentSockets.find(browserSocket); - } - - /** - * @param {ObjectId} token - * @param {Socket} browserSock - * @returns {int} Connected agent count. - */ - onBrowserConnect(token, browserSock) { - this.emitAgentsCount(token); - - // If connect from browser with enabled demo. - const demo = browserSock.request._query.IgniteDemoMode === 'true'; - - // Agents where possible to run demo. - const agentSockets = _.filter(this._agentSockets[token], 'demo.enabled'); - - if (demo && _.size(agentSockets)) { - const agentSocket = _.find(agentSockets, (agent) => _.includes(agent.demo.tokens, token)); - - if (agentSocket) - agentSocket.attachToDemoCluster(browserSock); - else - _.head(agentSockets).runDemoCluster(token, [browserSock]); - } - } - - /** - * @param {Socket} browserSock. - */ - onBrowserDisconnect(browserSock) { - const token = browserSock.client.request.user.token; - - this._browserSockets.pull(token, browserSock); - - // If connect from browser with enabled demo. - if (browserSock.request._query.IgniteDemoMode === 'true') - this._agentSockets.find(token, (agent) => _.includes(agent.demo.browserSockets, browserSock)); - - // TODO If latest browser with demo need stop demo cluster on agent. - } - /** * Try stop agent for token if not used by others. * - * @param {String} token + * @param {mongo.Account} account */ - onTokenReset(token) { + onTokenReset(account) { if (_.isNil(this.io)) return; - const sockets = this._agentSockets[token]; + const agentSockets = this._agentSockets.get(account); - _.forEach(sockets, (socket) => socket._sendToAgent('agent:reset:token', token)); + _.forEach(agentSockets, (sock) => sock.resetToken(account.token)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/browsersHandler.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js index d0cd112..a18b467 100644 --- a/modules/web-console/backend/app/browsersHandler.js +++ b/modules/web-console/backend/app/browsersHandler.js @@ -38,14 +38,14 @@ module.exports = { * @param {Socket} sock */ add(sock) { - const token = sock.request.user.token; + const token = sock.request.user._id.toString(); if (this.sockets.has(token)) this.sockets.get(token).push(sock); else this.sockets.set(token, [sock]); - return this.sockets.get(token); + return this.sockets.get(sock.request.user); } /** @@ -61,11 +61,13 @@ module.exports = { return sockets; } - get(token) { - if (this.sockets.has(token)) - return this.sockets.get(token); + get(account) { + let sockets = this.sockets.get(account._id.toString()); - return []; + if (_.isEmpty(sockets)) + this.sockets.set(account._id.toString(), sockets = []); + + return sockets; } demo(token) { @@ -103,11 +105,11 @@ module.exports = { } /** - * @param {String} token + * @param {String} account * @param {Array.<Socket>} [socks] */ - agentStats(token, socks = this._browserSockets.get(token)) { - return this._agentHnd.agents(token) + agentStats(account, socks = this._browserSockets.get(account)) { + return this._agentHnd.agents(account) .then((agentSocks) => { const stat = _.reduce(agentSocks, (acc, agentSock) => { acc.count += 1; @@ -127,8 +129,8 @@ module.exports = { .then((stat) => _.forEach(socks, (sock) => sock.emit('agents:stat', stat))); } - clusterChanged(token, cluster) { - const socks = this._browserSockets.get(token); + clusterChanged(account, cluster) { + const socks = this._browserSockets.get(account); _.forEach(socks, (sock) => sock.emit('cluster:changed', cluster)); } @@ -153,10 +155,10 @@ module.exports = { } } - executeOnAgent(token, demo, event, ...args) { + executeOnAgent(account, demo, event, ...args) { const cb = _.last(args); - return this._agentHnd.agent(token, demo) + return this._agentHnd.agent(account, demo) .then((agentSock) => agentSock.emitEvent(event, ..._.dropRight(args))) .then((res) => cb(null, res)) .catch((err) => cb(this.errorTransformer(err))); @@ -164,21 +166,21 @@ module.exports = { agentListeners(sock) { const demo = sock.request._query.IgniteDemoMode === 'true'; - const token = () => sock.request.user.token; + const account = () => sock.request.user; // Return available drivers to browser. sock.on('schemaImport:drivers', (...args) => { - this.executeOnAgent(token(), demo, 'schemaImport:drivers', ...args); + this.executeOnAgent(account(), demo, 'schemaImport:drivers', ...args); }); // Return schemas from database to browser. sock.on('schemaImport:schemas', (...args) => { - this.executeOnAgent(token(), demo, 'schemaImport:schemas', ...args); + this.executeOnAgent(account(), demo, 'schemaImport:schemas', ...args); }); // Return tables from database to browser. sock.on('schemaImport:metadata', (...args) => { - this.executeOnAgent(token(), demo, 'schemaImport:metadata', ...args); + this.executeOnAgent(account(), demo, 'schemaImport:metadata', ...args); }); } @@ -218,9 +220,7 @@ module.exports = { return cb('Invalid format of message: "node:rest"'); } - const token = sock.request.user.token; - - const agent = this._agentHnd.agent(token, demo, clusterId); + const agent = this._agentHnd.agent(sock.request.user, demo, clusterId); this.executeOnNode(agent, demo, credentials, params) .then((data) => cb(null, data)) @@ -259,8 +259,6 @@ module.exports = { return cb('Invalid format of message: "node:visor"'); } - const token = sock.request.user.token; - const {taskId, nids, args = []} = params; const desc = this._visorTasks.get(taskId); @@ -277,7 +275,7 @@ module.exports = { _.forEach(_.concat(desc.argCls, args), (param, idx) => { exeParams[`p${idx + 3}`] = param; }); - const agent = this._agentHnd.agent(token, demo, clusterId); + const agent = this._agentHnd.agent(sock.request.user, demo, clusterId); this.executeOnNode(agent, demo, credentials, exeParams) .then((data) => { @@ -317,18 +315,13 @@ module.exports = { // Handle browser disconnect event. sock.on('disconnect', () => { this._browserSockets.remove(sock); - - const demo = sock.request._query.IgniteDemoMode === 'true'; - - // Stop demo if latest demo tab for this token. - demo && agentHnd.tryStopDemo(sock); }); this.agentListeners(sock); this.nodeListeners(sock); this.pushInitialData(sock); - this.agentStats(sock.request.user.token, [sock]); + this.agentStats(sock.request.user, [sock]); this.emitNotification(sock); }); }); http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/configure.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/configure.js b/modules/web-console/backend/app/configure.js index b7bdb49..a0e5190 100644 --- a/modules/web-console/backend/app/configure.js +++ b/modules/web-console/backend/app/configure.js @@ -47,8 +47,6 @@ module.exports.factory = function(settings, mongo, apis) { _.forEach(apis, (api) => app.use(api)); - app.use(cookieParser(settings.sessionSecret)); - app.use(bodyParser.json({limit: '50mb'})); app.use(bodyParser.urlencoded({limit: '50mb', extended: true})); @@ -67,18 +65,26 @@ module.exports.factory = function(settings, mongo, apis) { app.use(passport.initialize()); app.use(passport.session()); - passport.serializeUser(mongo.Account.serializeUser()); - passport.deserializeUser(mongo.Account.deserializeUser()); + passport.serializeUser((user, done) => done(null, user._id)); + + passport.deserializeUser((id, done) => { + if (mongo.ObjectId.isValid(id)) + return mongo.Account.findById(id, done); + + // Invalidates the existing login session. + done(null, false); + }); passport.use(mongo.Account.createStrategy()); }, socketio: (io) => { - const _onAuthorizeSuccess = (data, accept) => { - accept(null, true); - }; + const _onAuthorizeSuccess = (data, accept) => accept(); const _onAuthorizeFail = (data, message, error, accept) => { - accept(null, false); + if (error) + accept(new Error(message)); + + return accept(new Error(message)); }; io.use(passportSocketIo.authorize({ http://git-wip-us.apache.org/repos/asf/ignite/blob/76daf05f/modules/web-console/backend/app/settings.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/settings.js b/modules/web-console/backend/app/settings.js index d206107..104b66d 100644 --- a/modules/web-console/backend/app/settings.js +++ b/modules/web-console/backend/app/settings.js @@ -61,7 +61,8 @@ module.exports = { server: { host: nconf.get('server:host') || dfltHost, port: _normalizePort(nconf.get('server:port') || dfltPort), - SSLOptions: nconf.get('server:ssl') && { + // eslint-disable-next-line eqeqeq + SSLOptions: nconf.get('server:ssl') == 'true' && { enable301Redirects: true, trustXFPHeader: true, key: fs.readFileSync(nconf.get('server:key')),
