IGNITE-4995 Multi-cluster support for Web Console.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/323e3870 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/323e3870 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/323e3870 Branch: refs/heads/ignite-4985 Commit: 323e38709d99c08142c54916e7a718895f867be8 Parents: 8c03220 Author: anovikov <[email protected]> Authored: Tue Apr 18 10:38:19 2017 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Tue Apr 18 10:38:19 2017 +0700 ---------------------------------------------------------------------- .../JettyRestProcessorAbstractSelfTest.java | 3 +- .../internal/visor/query/VisorQueryArg.java | 16 +- .../internal/visor/query/VisorQueryTask.java | 21 +- modules/web-console/backend/app/agent.js | 926 ------------------- modules/web-console/backend/app/agentSocket.js | 249 +++++ .../web-console/backend/app/agentsHandler.js | 400 ++++++++ modules/web-console/backend/app/apiServer.js | 68 ++ modules/web-console/backend/app/app.js | 63 -- modules/web-console/backend/app/browser.js | 539 ----------- .../web-console/backend/app/browsersHandler.js | 279 ++++++ modules/web-console/backend/app/routes.js | 6 +- modules/web-console/backend/index.js | 44 +- modules/web-console/backend/package.json | 52 +- modules/web-console/backend/routes/agent.js | 57 -- modules/web-console/backend/routes/demo.js | 14 +- modules/web-console/backend/routes/downloads.js | 57 ++ modules/web-console/backend/services/agents.js | 83 -- .../web-console/backend/services/downloads.js | 80 ++ modules/web-console/backend/services/users.js | 10 +- .../web-console/backend/test/app/httpAgent.js | 6 +- .../web-console/backend/test/routes/clusters.js | 4 +- modules/web-console/frontend/app/app.config.js | 1 + modules/web-console/frontend/app/app.js | 12 +- .../components/activities-user-dialog/index.js | 1 - .../cluster-select/cluster-select.controller.js | 55 ++ .../cluster-select/cluster-select.pug | 40 + .../app/components/cluster-select/index.js | 28 + .../input-dialog/input-dialog.service.js | 1 - .../list-of-registered-users.column-defs.js | 2 +- .../app/helpers/jade/form/form-field-text.pug | 19 +- .../frontend/app/helpers/jade/mixins.pug | 6 + .../app/modules/agent/AgentManager.service.js | 529 +++++++++++ .../app/modules/agent/AgentModal.service.js | 89 ++ .../frontend/app/modules/agent/agent.module.js | 347 +------ .../frontend/app/modules/cluster/Cache.js | 51 + .../app/modules/cluster/CacheMetrics.js | 51 + .../frontend/app/modules/cluster/Node.js | 54 ++ .../frontend/app/modules/cluster/NodeMetrics.js | 19 + .../frontend/app/modules/demo/Demo.module.js | 5 +- .../app/modules/dialog/dialog.factory.js | 1 - .../getting-started/GettingStarted.provider.js | 2 +- .../app/modules/navbar/userbar.directive.js | 4 +- .../frontend/app/modules/nodes/Nodes.service.js | 1 - .../app/modules/sql/notebook.controller.js | 2 +- .../frontend/app/modules/sql/sql.controller.js | 49 +- .../frontend/app/modules/states/admin.state.js | 4 +- .../app/modules/states/configuration.state.js | 17 +- .../app/modules/states/profile.state.js | 2 +- .../frontend/app/modules/user/Auth.service.js | 6 +- .../frontend/app/services/Confirm.service.js | 2 +- .../app/services/ConfirmBatch.service.js | 1 - .../frontend/controllers/caches-controller.js | 2 +- .../frontend/controllers/domains-controller.js | 20 +- modules/web-console/frontend/package.json | 62 +- .../stylesheets/_bootstrap-variables.scss | 2 +- .../stylesheets/_font-awesome-custom.scss | 5 + .../frontend/public/stylesheets/style.scss | 57 +- .../frontend/views/includes/header.pug | 23 +- .../views/templates/agent-download.tpl.pug | 39 +- .../frontend/views/templates/demo-info.tpl.pug | 2 +- modules/web-console/web-agent/pom.xml | 16 +- .../console/agent/AgentConfiguration.java | 47 +- .../ignite/console/agent/AgentLauncher.java | 196 ++-- .../apache/ignite/console/agent/AgentUtils.java | 6 +- .../console/agent/handlers/ClusterListener.java | 266 ++++++ .../agent/handlers/DatabaseListener.java | 6 +- .../console/agent/handlers/DemoListener.java | 131 +++ .../console/agent/handlers/RestListener.java | 229 +---- .../ignite/console/agent/rest/RestExecutor.java | 197 ++++ .../ignite/console/agent/rest/RestResult.java | 81 ++ .../ignite/console/demo/AgentClusterDemo.java | 175 ++-- .../ignite/console/demo/AgentDemoUtils.java | 2 +- .../demo/service/DemoCachesLoadService.java | 35 +- 73 files changed, 3346 insertions(+), 2631 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index cd3011c..c383de0 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -1408,7 +1408,8 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro ret = content(new VisorGatewayArgument(VisorQueryTask.class) .forNode(locNode) - .argument(VisorQueryArg.class, "person", URLEncoder.encode("select * from Person", CHARSET), false, false, false, 1)); + .argument(VisorQueryArg.class, "person", URLEncoder.encode("select * from Person", CHARSET), + false, false, false, false, 1)); info("VisorQueryTask result: " + ret); http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java index 1cb1f0d..d4eb65a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java @@ -43,6 +43,9 @@ public class VisorQueryArg extends VisorDataTransferObject { /** Enforce join order flag. */ private boolean enforceJoinOrder; + /** Query contains only replicated tables flag.*/ + private boolean replicatedOnly; + /** Flag whether to execute query locally. */ private boolean loc; @@ -61,15 +64,17 @@ public class VisorQueryArg extends VisorDataTransferObject { * @param qryTxt Query text. * @param distributedJoins If {@code true} then distributed joins enabled. * @param enforceJoinOrder If {@code true} then enforce join order. + * @param replicatedOnly {@code true} then query contains only replicated tables. * @param loc Flag whether to execute query locally. * @param pageSize Result batch size. */ public VisorQueryArg(String cacheName, String qryTxt, - boolean distributedJoins, boolean enforceJoinOrder, boolean loc, int pageSize) { + boolean distributedJoins, boolean enforceJoinOrder, boolean replicatedOnly, boolean loc, int pageSize) { this.cacheName = cacheName; this.qryTxt = qryTxt; this.distributedJoins = distributedJoins; this.enforceJoinOrder = enforceJoinOrder; + this.replicatedOnly = replicatedOnly; this.loc = loc; this.pageSize = pageSize; } @@ -103,7 +108,14 @@ public class VisorQueryArg extends VisorDataTransferObject { } /** - * @return {@code true} if query should be executed locally. + * @return {@code true} If the query contains only replicated tables. + */ + public boolean isReplicatedOnly() { + return replicatedOnly; + } + + /** + * @return {@code true} If query should be executed locally. */ public boolean isLocal() { return loc; http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java index 303e6b6..815cf6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java @@ -22,10 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; -import javax.cache.Cache; import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.task.GridInternal; @@ -35,7 +32,6 @@ import org.apache.ignite.internal.visor.VisorEither; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorOneNodeTask; import org.apache.ignite.internal.visor.util.VisorExceptionWrapper; -import org.apache.ignite.lang.IgniteBiPredicate; import static org.apache.ignite.internal.visor.query.VisorQueryUtils.SQL_QRY_NAME; import static org.apache.ignite.internal.visor.query.VisorQueryUtils.fetchSqlQueryRows; @@ -71,22 +67,6 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, VisorEither< super(arg, debug); } - /** - * Execute scan query. - * - * @param c Cache to scan. - * @param arg Job argument with query parameters. - * @return Query cursor. - */ - private QueryCursor<Cache.Entry<Object, Object>> scan(IgniteCache<Object, Object> c, VisorQueryArg arg, - IgniteBiPredicate<Object, Object> filter) { - ScanQuery<Object, Object> qry = new ScanQuery<>(filter); - qry.setPageSize(arg.getPageSize()); - qry.setLocal(arg.isLocal()); - - return c.withKeepBinary().query(qry); - } - /** {@inheritDoc} */ @Override protected VisorEither<VisorQueryResult> run(final VisorQueryArg arg) { try { @@ -98,6 +78,7 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, VisorEither< qry.setLocal(arg.isLocal()); qry.setDistributedJoins(arg.isDistributedJoins()); qry.setEnforceJoinOrder(arg.isEnforceJoinOrder()); + qry.setReplicatedOnly(arg.isReplicatedOnly()); long start = U.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/agent.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/agent.js b/modules/web-console/backend/app/agent.js deleted file mode 100644 index 758d31b..0000000 --- a/modules/web-console/backend/app/agent.js +++ /dev/null @@ -1,926 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict'; - -// Fire me up! - -/** - * Module interaction with agents. - */ -module.exports = { - implements: 'agent-manager', - inject: ['require(lodash)', 'require(fs)', 'require(path)', 'require(jszip)', 'require(socket.io)', 'settings', 'mongo', 'services/activities'] -}; - -/** - * @param _ - * @param fs - * @param path - * @param JSZip - * @param socketio - * @param settings - * @param mongo - * @param {ActivitiesService} activitiesService - * @returns {AgentManager} - */ -module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, activitiesService) { - /** - * - */ - class Command { - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} name Command name. - */ - constructor(demo, name) { - this._demo = demo; - - /** - * Command name. - * @type {String} - */ - this._name = name; - - /** - * Command parameters. - * @type {Array.<String>} - */ - this._params = []; - } - - /** - * Add parameter to command. - * @param {string} key Parameter key. - * @param {Object} value Parameter value. - * @returns {Command} - */ - addParam(key, value) { - this._params.push({key, value}); - - return this; - } - } - - /** - * Connected agent descriptor. - */ - class Agent { - /** - * @param {socketIo.Socket} socket - Agent socket for interaction. - */ - constructor(socket) { - /** - * Agent socket for interaction. - * - * @type {socketIo.Socket} - * @private - */ - this._socket = socket; - } - - /** - * Send message to agent. - * - * @this {Agent} - * @param {String} event Command name. - * @param {Object} data Command params. - * @param {Function} [callback] on finish - */ - _emit(event, data, callback) { - if (!this._socket.connected) { - if (callback) - callback('org.apache.ignite.agent.AgentException: Connection is closed'); - - return; - } - - this._socket.emit(event, data, callback); - } - - /** - * Send message to agent. - * - * @param {String} event - Event name. - * @param {Object?} data - Transmitted data. - * @returns {Promise} - */ - executeAgent(event, data) { - return new Promise((resolve, reject) => - this._emit(event, data, (error, res) => { - if (error) - return reject(error); - - resolve(res); - }) - ); - } - - /** - * Execute rest request on node. - * - * @param {Command} cmd - REST command. - * @return {Promise} - */ - executeRest(cmd) { - const params = {cmd: cmd._name}; - - for (const param of cmd._params) - params[param.key] = param.value; - - return new Promise((resolve, reject) => { - this._emit('node:rest', {uri: 'ignite', params, demo: cmd._demo, method: 'GET'}, (error, res) => { - if (error) - return reject(new Error(error)); - - error = res.error; - - const code = res.code; - - if (code === 401) - return reject(new Error('Agent failed to authenticate in grid. Please check agent\'s login and password or node port.')); - - if (code !== 200) - return reject(new Error(error || 'Failed connect to node and execute REST command.')); - - try { - const msg = JSON.parse(res.data); - - if (msg.successStatus === 0) - return resolve(msg.response); - - if (msg.successStatus === 2) - return reject(new Error('Agent failed to authenticate in grid. Please check agent\'s login and password or node port.')); - - reject(new Error(msg.error)); - } - catch (e) { - return reject(e); - } - }); - }); - } - - /** - * @param {String} driverPath - * @param {String} driverClass - * @param {String} url - * @param {Object} info - * @returns {Promise} Promise on list of tables (see org.apache.ignite.schema.parser.DbTable java class) - */ - metadataSchemas(driverPath, driverClass, url, info) { - return this.executeAgent('schemaImport:schemas', {driverPath, driverClass, url, info}); - } - - /** - * @param {String} driverPath - * @param {String} driverClass - * @param {String} url - * @param {Object} info - * @param {Array} schemas - * @param {Boolean} tablesOnly - * @returns {Promise} Promise on list of tables (see org.apache.ignite.schema.parser.DbTable java class) - */ - metadataTables(driverPath, driverClass, url, info, schemas, tablesOnly) { - return this.executeAgent('schemaImport:metadata', {driverPath, driverClass, url, info, schemas, tablesOnly}); - } - - /** - * @returns {Promise} Promise on list of jars from driver folder. - */ - availableDrivers() { - return this.executeAgent('schemaImport:drivers'); - } - - /** - * - * @param {Boolean} demo Is need run command on demo node. - * @param {Boolean} attr Get attributes, if this parameter has value true. Default value: true. - * @param {Boolean} mtr Get metrics, if this parameter has value true. Default value: false. - * @returns {Promise} - */ - topology(demo, attr, mtr) { - const cmd = new Command(demo, 'top') - .addParam('attr', attr !== false) - .addParam('mtr', !!mtr); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {String} cacheName Cache name. - * @param {String} query Query. - * @param {Boolean} nonCollocatedJoins Flag whether to execute non collocated joins. - * @param {Boolean} enforceJoinOrder Flag whether enforce join order is enabled. - * @param {Boolean} local Flag whether to execute query locally. - * @param {int} pageSize Page size. - * @returns {Promise} - */ - fieldsQuery(demo, nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, local, pageSize) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryTask') - .addParam('p3', 'org.apache.ignite.internal.visor.query.VisorQueryArg') - .addParam('p4', cacheName) - .addParam('p5', query) - .addParam('p6', nonCollocatedJoins) - .addParam('p7', enforceJoinOrder) - .addParam('p8', local) - .addParam('p9', pageSize); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {String} cacheName Cache name. - * @param {String} filter Filter text. - * @param {Boolean} regEx Flag whether filter by regexp. - * @param {Boolean} caseSensitive Case sensitive filtration. - * @param {Boolean} near Scan near cache. - * @param {Boolean} local Flag whether to execute query locally. - * @param {int} pageSize Page size. - * @returns {Promise} - */ - queryScan(demo, nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorScanQueryTask') - .addParam('p3', 'org.apache.ignite.internal.visor.query.VisorScanQueryArg') - .addParam('p4', cacheName) - .addParam('p5', filter) - .addParam('p6', regEx) - .addParam('p7', caseSensitive) - .addParam('p8', near) - .addParam('p9', local) - .addParam('p10', pageSize); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {int} queryId Query Id. - * @param {int} pageSize Page size. - * @returns {Promise} - */ - queryFetch(demo, nid, queryId, pageSize) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryNextPageTask') - .addParam('p3', 'org.apache.ignite.lang.VisorQueryNextPageTaskArg') - .addParam('p4', 'java.lang.String') - .addParam('p5', 'java.lang.Integer') - .addParam('p6', queryId) - .addParam('p7', pageSize); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {int} queryId Query Id. - * @returns {Promise} - */ - queryClose(demo, nid, queryId) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', '') - .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryCleanupTask') - .addParam('p3', 'java.util.Map') - .addParam('p4', 'java.util.UUID') - .addParam('p5', 'java.util.Set') - .addParam('p6', `${nid}=${queryId}`); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {Array.<String>} nids Node ids. - * @param {Number} since Metrics since. - * @returns {Promise} - */ - queryDetailMetrics(demo, nids, since) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nids) - .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryDetailMetricsCollectorTask') - .addParam('p3', 'java.lang.Long') - .addParam('p4', since); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {Array.<String>} nids Node ids. - * @returns {Promise} - */ - queryResetDetailMetrics(demo, nids) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nids) - .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryResetDetailMetricsTask') - .addParam('p3', 'java.lang.Void'); - - return this.executeRest(cmd); - } - - /** - * Collect running queries - * @param {Boolean} demo Is need run command on demo node. - * @param {Number} duration minimum duration time of running queries. - * @returns {Promise} - */ - queryCollectRunning(demo, duration) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', '') - .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorRunningQueriesCollectorTask') - .addParam('p3', 'java.lang.Long') - .addParam('p4', duration); - - return this.executeRest(cmd); - } - - /** - * Cancel running query. - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {Number} queryId query id to cancel. - * @returns {Promise} - */ - queryCancel(demo, nid, queryId) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.query.VisorQueryCancelTask') - .addParam('p3', 'java.lang.Long') - .addParam('p4', queryId); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} cacheName Cache name. - * @returns {Promise} - */ - metadata(demo, cacheName) { - const cmd = new Command(demo, 'metadata') - .addParam('cacheName', cacheName); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} evtOrderKey Event order key, unique for tab instance. - * @param {String} evtThrottleCntrKey Event throttle counter key, unique for tab instance. - * @returns {Promise} - */ - collect(demo, evtOrderKey, evtThrottleCntrKey) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', '') - .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeDataCollectorTask') - .addParam('p3', 'org.apache.ignite.internal.visor.node.VisorNodeDataCollectorTaskArg') - .addParam('p4', true) - .addParam('p5', 'CONSOLE_' + evtOrderKey) - .addParam('p6', evtThrottleCntrKey) - .addParam('p7', 10) - .addParam('p8', false); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @returns {Promise} - */ - collectNodeConfiguration(demo, nid) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeConfigurationCollectorTask') - .addParam('p3', 'java.lang.Void') - .addParam('p4', null); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {Array.<String>} caches Caches deployment IDs to collect configuration. - * @returns {Promise} - */ - collectCacheConfigurations(demo, nid, caches) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheConfigurationCollectorTask') - .addParam('p3', 'java.util.Collection') - .addParam('p4', 'org.apache.ignite.lang.IgniteUuid') - .addParam('p5', caches); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {String} cacheName Cache name. - * @returns {Promise} - */ - cacheClear(demo, nid, cacheName) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheClearTask') - .addParam('p3', 'java.lang.String') - .addParam('p4', cacheName); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {Array.<String>} nids Node ids. - * @param {Boolean} near true if near cache should be started. - * @param {String} cacheName Name for near cache. - * @param {String} cfg Cache XML configuration. - * @returns {Promise} - */ - cacheStart(demo, nids, near, cacheName, cfg) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nids) - .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheStartTask') - .addParam('p3', 'org.apache.ignite.internal.visor.cache.VisorCacheStartArg') - .addParam('p4', near) - .addParam('p5', cacheName) - .addParam('p6', cfg); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {String} cacheName Cache name. - * @returns {Promise} - */ - cacheStop(demo, nid, cacheName) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheStopTask') - .addParam('p3', 'java.lang.String') - .addParam('p4', cacheName); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node id. - * @param {String} cacheName Cache name. - * @returns {Promise} - */ - cacheResetMetrics(demo, nid, cacheName) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask') - .addParam('p3', 'java.lang.String') - .addParam('p4', cacheName); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nids Node ids. - * @returns {Promise} - */ - gc(demo, nids) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nids) - .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeGcTask') - .addParam('p3', 'java.lang.Void'); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} taskNid node that is not node we want to ping. - * @param {String} nid Id of the node to ping. - * @returns {Promise} - */ - ping(demo, taskNid, nid) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', taskNid) - .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodePingTask') - .addParam('p3', 'java.util.UUID') - .addParam('p4', nid); - - return this.executeRest(cmd); - } - - /** - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Id of the node to get thread dump. - * @returns {Promise} - */ - threadDump(demo, nid) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.debug.VisorThreadDumpTask') - .addParam('p3', 'java.lang.Void'); - - return this.executeRest(cmd); - } - - /** - * Collect cache partitions. - * @param {Boolean} demo Is need run command on demo node. - * @param {Array.<String>} nids Cache node IDs. - * @param {String} cacheName Cache name. - * @returns {Promise} - */ - partitions(demo, nids, cacheName) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nids) - .addParam('p2', 'org.apache.ignite.internal.visor.cache.VisorCachePartitionsTask') - .addParam('p3', 'java.lang.String') - .addParam('p4', cacheName); - - return this.executeRest(cmd); - } - - /** - * Stops given node IDs. - * @param {Boolean} demo Is need run command on demo node. - * @param {Array.<String>} nids Nodes IDs. - * @returns {Promise} - */ - stopNodes(demo, nids) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nids) - .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeStopTask') - .addParam('p3', 'java.lang.Void'); - - return this.executeRest(cmd); - } - - /** - * Restarts given node IDs. - * @param {Boolean} demo Is need run command on demo node. - * @param {Array.<String>} nids Nodes IDs. - * @returns {Promise} - */ - restartNodes(demo, nids) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nids) - .addParam('p2', 'org.apache.ignite.internal.visor.node.VisorNodeRestartTask') - .addParam('p3', 'java.lang.Void'); - - return this.executeRest(cmd); - } - - /** - * Collect service information. - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node ID. - * @returns {Promise} - */ - services(demo, nid) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.service.VisorServiceTask') - .addParam('p3', 'java.lang.Void'); - - return this.executeRest(cmd); - } - - /** - * Cancel service with specified name. - * @param {Boolean} demo Is need run command on demo node. - * @param {String} nid Node ID. - * @param {String} name Name of service to cancel. - * @returns {Promise} - */ - serviceCancel(demo, nid, name) { - const cmd = new Command(demo, 'exe') - .addParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') - .addParam('p1', nid) - .addParam('p2', 'org.apache.ignite.internal.visor.service.VisorCancelServiceTask') - .addParam('p3', 'java.lang.String') - .addParam('p4', name); - - return this.executeRest(cmd); - } - } - - /** - * Connected agents manager. - */ - class AgentManager { - /** - * @constructor - */ - constructor() { - /** - * Connected agents by user id. - * @type {Object.<ObjectId, Array.<Agent>>} - */ - this._agents = {}; - - /** - * Connected browsers by user id. - * @type {Object.<ObjectId, Array.<Socket>>} - */ - this._browsers = {}; - - const agentArchives = fs.readdirSync(settings.agent.dists) - .filter((file) => path.extname(file) === '.zip'); - - /** - * Supported agents distribution. - * @type {Object.<String, String>} - */ - this.supportedAgents = {}; - - const jarFilter = (file) => path.extname(file) === '.jar'; - - const agentsPromises = _.map(agentArchives, (fileName) => { - const filePath = path.join(settings.agent.dists, fileName); - - return JSZip.loadAsync(fs.readFileSync(filePath)) - .then((zip) => { - const jarPath = _.find(_.keys(zip.files), jarFilter); - - return JSZip.loadAsync(zip.files[jarPath].async('nodebuffer')) - .then((jar) => jar.files['META-INF/MANIFEST.MF'].async('string')) - .then((lines) => lines.trim() - .split(/\s*\n+\s*/) - .map((line, r) => { - r = line.split(/\s*:\s*/); - - this[r[0]] = r[1]; - - return this; - }, {})[0]) - .then((manifest) => { - const ver = manifest['Implementation-Version']; - const buildTime = manifest['Build-Time']; - - if (ver && buildTime) - return { fileName, filePath, ver, buildTime }; - }); - }); - }); - - Promise.all(agentsPromises) - .then((agents) => { - this.supportedAgents = _.keyBy(_.remove(agents, null), 'ver'); - - const latest = _.head(Object.keys(this.supportedAgents).sort((a, b) => { - const aParts = a.split('.'); - const bParts = b.split('.'); - - for (let i = 0; i < aParts.length; ++i) { - if (aParts[i] !== bParts[i]) - return aParts[i] < bParts[i] ? 1 : -1; - } - - if (aParts.length === bParts.length) - return 0; - - return aParts.length < bParts.length ? 1 : -1; - })); - - // Latest version of agent distribution. - if (latest) - this.supportedAgents.latest = this.supportedAgents[latest]; - }); - } - - attachLegacy(srv) { - /** - * @type {socketIo.Server} - */ - const io = socketio(srv); - - io.on('connection', (socket) => { - socket.on('agent:auth', (data, cb) => { - return cb('You are using an older version of the agent. Please reload agent archive'); - }); - }); - } - - /** - * @param {http.Server|https.Server} srv Server instance that we want to attach agent handler. - */ - attach(srv) { - if (this._server) - throw 'Agent server already started!'; - - this._server = srv; - - /** - * @type {socketIo.Server} - */ - this._socket = socketio(this._server, {path: '/agents'}); - - this._socket.on('connection', (socket) => { - socket.on('agent:auth', (data, cb) => { - if (!_.isEmpty(this.supportedAgents)) { - const ver = data.ver; - const bt = data.bt; - - if (_.isEmpty(ver) || _.isEmpty(bt) || _.isEmpty(this.supportedAgents[ver]) || - this.supportedAgents[ver].buildTime > bt) - return cb('You are using an older version of the agent. Please reload agent archive'); - } - - const tokens = data.tokens; - - mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec() - .then((accounts) => { - if (!accounts.length) - return cb('Agent is failed to authenticate. Please check agent\'s token(s)'); - - const agent = new Agent(socket); - - const accountIds = _.map(accounts, (account) => account._id); - - socket.on('disconnect', () => this._agentDisconnected(accountIds, agent)); - - this._agentConnected(accountIds, agent); - - const missedTokens = _.difference(tokens, _.map(accounts, (account) => account.token)); - - if (missedTokens.length) { - agent._emit('agent:warning', - `Failed to authenticate with token(s): ${missedTokens.join(', ')}.`); - } - - cb(); - }) - // TODO IGNITE-1379 send error to web master. - .catch(() => cb('Agent is failed to authenticate. Please check agent\'s tokens')); - }); - }); - } - - /** - * @param {ObjectId} accountId - * @param {Socket} socket - * @returns {int} Connected agent count. - */ - addAgentListener(accountId, socket) { - let sockets = this._browsers[accountId]; - - if (!sockets) - this._browsers[accountId] = sockets = []; - - sockets.push(socket); - - const agents = this._agents[accountId]; - - return agents ? agents.length : 0; - } - - /** - * @param {ObjectId} accountId. - * @param {Socket} socket. - * @returns {int} connected agent count. - */ - removeAgentListener(accountId, socket) { - const sockets = this._browsers[accountId]; - - _.pull(sockets, socket); - } - - /** - * @param {ObjectId} accountId - * @returns {Promise.<Agent>} - */ - findAgent(accountId) { - if (!this._server) - return Promise.reject(new Error('Agent server not started yet!')); - - const agents = this._agents[accountId]; - - if (!agents || agents.length === 0) - return Promise.reject(new Error('Failed to connect to agent')); - - return Promise.resolve(agents[0]); - } - - /** - * Close connections for all user agents. - * @param {ObjectId} accountId - * @param {String} oldToken - */ - close(accountId, oldToken) { - if (!this._server) - return; - - const agentsForClose = this._agents[accountId]; - - const agentsForWarning = _.clone(agentsForClose); - - this._agents[accountId] = []; - - _.forEach(this._agents, (sockets) => _.pullAll(agentsForClose, sockets)); - - _.pullAll(agentsForWarning, agentsForClose); - - const msg = `Security token has been reset: ${oldToken}`; - - _.forEach(agentsForWarning, (socket) => socket._emit('agent:warning', msg)); - - _.forEach(agentsForClose, (socket) => socket._emit('agent:close', msg)); - - _.forEach(this._browsers[accountId], (socket) => socket.emit('agent:count', {count: 0})); - } - - /** - * @param {ObjectId} accountIds - * @param {Agent} agent - */ - _agentConnected(accountIds, agent) { - _.forEach(accountIds, (accountId) => { - let agents = this._agents[accountId]; - - if (!agents) - this._agents[accountId] = agents = []; - - agents.push(agent); - - const sockets = this._browsers[accountId]; - - _.forEach(sockets, (socket) => socket.emit('agent:count', {count: agents.length})); - - activitiesService.merge(accountId, { - group: 'agent', - action: '/agent/start' - }); - }); - } - - /** - * @param {ObjectId} accountIds - * @param {Agent} agent - */ - _agentDisconnected(accountIds, agent) { - _.forEach(accountIds, (accountId) => { - const agents = this._agents[accountId]; - - if (agents && agents.length) - _.pull(agents, agent); - - const sockets = this._browsers[accountId]; - - _.forEach(sockets, (socket) => socket.emit('agent:count', {count: agents.length})); - }); - } - } - - return new AgentManager(); -}; http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/agentSocket.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/agentSocket.js b/modules/web-console/backend/app/agentSocket.js new file mode 100644 index 0000000..db1deaa --- /dev/null +++ b/modules/web-console/backend/app/agentSocket.js @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +// Fire me up! + +/** + * Module interaction with agents. + */ +module.exports = { + implements: 'agent-socket', + inject: ['require(lodash)'] +}; + +/** + * Helper class to contract REST command. + */ +class Command { + /** + * @param {Boolean} demo Is need run command on demo node. + * @param {String} name Command name. + */ + constructor(demo, name) { + this.demo = demo; + + /** + * Command name. + * @type {String} + */ + this._name = name; + + /** + * Command parameters. + * @type {Array.<Object.<String, String>>} + */ + this._params = []; + + this._paramsLastIdx = 1; + } + + /** + * Add parameter to command. + * @param {Object} value Parameter value. + * @returns {Command} + */ + addParam(value) { + this._params.push({key: `p${this._paramsLastIdx++}`, value}); + + return this; + } + + /** + * Add parameter to command. + * @param {String} key Parameter key. + * @param {Object} value Parameter value. + * @returns {Command} + */ + addNamedParam(key, value) { + this._params.push({key, value}); + + return this; + } +} + +/** + * @param _ + * @returns {AgentSocket} + */ +module.exports.factory = function(_) { + /** + * Connected agent descriptor. + */ + class AgentSocket { + /** + * @param {Socket} socket Socket for interaction. + * @param {String} tokens Active tokens. + * @param {String} demoEnabled Demo enabled. + */ + constructor(socket, tokens, demoEnabled) { + Object.assign(this, { + socket, + tokens, + cluster: null, + demo: { + enabled: demoEnabled, + browserSockets: [] + } + }); + } + + /** + * Send event to agent. + * + * @this {AgentSocket} + * @param {String} event Command name. + * @param {Array.<Object>} args - Transmitted arguments. + * @param {Function} [callback] on finish + */ + _emit(event, args, callback) { + if (!this.socket.connected) { + if (callback) + callback('org.apache.ignite.agent.AgentException: Connection is closed'); + + return; + } + + this.socket.emit(event, ...args, callback); + } + + /** + * Send event to agent. + * + * @param {String} event - Event name. + * @param {Array.<Object>?} args - Transmitted arguments. + * @returns {Promise} + */ + emitEvent(event, ...args) { + return new Promise((resolve, reject) => + this._emit(event, args, (error, res) => { + if (error) + return reject(error); + + resolve(res); + }) + ); + } + + restResultParse(res) { + if (res.status === 0) + return JSON.parse(res.data); + + if (res.status === 2) + throw new Error('AgentSocket failed to authenticate in grid. Please check agent\'s login and password or node port.'); + + throw new Error(res.error); + } + + /** + * @param {String} token + * @param {Array.<Socket>} browserSockets + */ + runDemoCluster(token, browserSockets) { + this.emitEvent('demo:broadcast:start') + .then(() => { + this.demo.tokens.push(token); + this.demo.browserSockets.push(...browserSockets); + + this.socket.on('demo:topology', (res) => { + try { + const top = this.restResultParse(res); + + _.forEach(this.demo.browserSockets, (sock) => sock.emit('topology', top)); + } catch (err) { + _.forEach(this.demo.browserSockets, (sock) => sock.emit('topology:err', err)); + } + }); + }); + } + + /** + * @param {Socket} browserSocket + */ + attachToDemoCluster(browserSocket) { + this.demo.browserSockets.push(...browserSocket); + } + + startCollectTopology(timeout) { + return this.emitEvent('start:collect:topology', timeout); + } + + stopCollectTopology(demo) { + return this.emitEvent('stop:collect:topology', demo); + } + + /** + * Execute REST request on node. + * + * @param {Boolean} demo Is need run command on demo node. + * @param {String} cmd REST command. + * @param {Array.<String>} args - REST command arguments. + * @return {Promise} + */ + restCommand(demo, cmd, ...args) { + const params = {cmd}; + + _.forEach(args, (arg, idx) => { + params[`p${idx + 1}`] = args[idx]; + }); + + return this.emitEvent('node:rest', {uri: 'ignite', demo, params, method: 'GET'}) + .then(this.restResultParse); + } + + gatewayCommand(demo, nids, taskCls, argCls, ...args) { + const cmd = new Command(demo, 'exe') + .addNamedParam('name', 'org.apache.ignite.internal.visor.compute.VisorGatewayTask') + .addParam(nids) + .addParam(taskCls) + .addParam(argCls); + + _.forEach(args, (arg) => cmd.addParam(arg)); + + return this.restCommand(cmd); + } + + /** + * @param {Boolean} demo Is need run command on demo node. + * @param {Boolean} attr Get attributes, if this parameter has value true. Default value: true. + * @param {Boolean} mtr Get metrics, if this parameter has value true. Default value: false. + * @returns {Promise} + */ + topology(demo, attr, mtr) { + const cmd = new Command(demo, 'top') + .addNamedParam('attr', attr !== false) + .addNamedParam('mtr', !!mtr); + + return this.restCommand(cmd); + } + + /** + * @param {Boolean} demo Is need run command on demo node. + * @param {String} cacheName Cache name. + * @returns {Promise} + */ + metadata(demo, cacheName) { + const cmd = new Command(demo, 'metadata') + .addNamedParam('cacheName', cacheName); + + return this.restCommand(cmd); + } + } + + return AgentSocket; +}; http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/agentsHandler.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/agentsHandler.js b/modules/web-console/backend/app/agentsHandler.js new file mode 100644 index 0000000..d24cef8 --- /dev/null +++ b/modules/web-console/backend/app/agentsHandler.js @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +// Fire me up! + +/** + * Module interaction with agents. + */ +module.exports = { + implements: 'agents-handler', + inject: ['require(lodash)', 'require(fs)', 'require(path)', 'require(jszip)', 'require(socket.io)', 'settings', 'mongo', 'agent-socket'] +}; + +/** + * @param _ + * @param fs + * @param path + * @param JSZip + * @param socketio + * @param settings + * @param mongo + * @param {AgentSocket} AgentSocket + * @returns {AgentsHandler} + */ +module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, AgentSocket) { + class AgentSockets { + constructor() { + /** + * @type {Map.<String, Array.<String>>} + */ + this.sockets = new Map(); + } + + get(token) { + let sockets = this.sockets.get(token); + + if (_.isEmpty(sockets)) + this.sockets.set(token, sockets = []); + + return sockets; + } + + /** + * @param {AgentSocket} sock + * @param {String} token + * @return {Array.<AgentSocket>} + */ + add(token, sock) { + const sockets = this.get(token); + + sockets.push(sock); + } + + /** + * @param {Socket} browserSocket + * @return {AgentSocket} + */ + find(browserSocket) { + const token = browserSocket.request.user.token; + + const sockets = this.sockets.get(token); + + return _.find(sockets, (sock) => _.includes(sock.demo.browserSockets, browserSocket)); + } + } + + class Cluster { + constructor(nids) { + let d = new Date().getTime(); + + this.id = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => { + const r = (d + Math.random() * 16) % 16 | 0; + + d = Math.floor(d / 16); + + return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16); + }); + + this.nids = nids; + } + + same(nids) { + return _.intersection(this.nids, nids).length > 0; + } + + updateTopology(nids) { + this.nids = nids; + } + } + + /** + * Connected agents manager. + */ + class AgentsHandler { + /** + * @constructor + */ + constructor() { + /** + * Connected agents. + * @type {AgentSockets} + */ + this._agentSockets = new AgentSockets(); + + this.clusters = []; + } + + /** + * Collect supported agents list. + * @private + */ + _collectSupportedAgents() { + const jarFilter = (file) => path.extname(file) === '.jar'; + + const agentArchives = fs.readdirSync(settings.agent.dists) + .filter((file) => path.extname(file) === '.zip'); + + const agentsPromises = _.map(agentArchives, (fileName) => { + const filePath = path.join(settings.agent.dists, fileName); + + return JSZip.loadAsync(fs.readFileSync(filePath)) + .then((zip) => { + const jarPath = _.find(_.keys(zip.files), jarFilter); + + return JSZip.loadAsync(zip.files[jarPath].async('nodebuffer')) + .then((jar) => jar.files['META-INF/MANIFEST.MF'].async('string')) + .then((lines) => + _.reduce(lines.split(/\s*\n+\s*/), (acc, line) => { + if (!_.isEmpty(line)) { + const arr = line.split(/\s*:\s*/); + + acc[arr[0]] = arr[1]; + } + + return acc; + }, {})) + .then((manifest) => { + const ver = manifest['Implementation-Version']; + const buildTime = manifest['Build-Time']; + + if (ver && buildTime) + return { fileName, filePath, ver, buildTime }; + }); + }); + }); + + return Promise.all(agentsPromises) + .then((descs) => { + const agentDescs = _.keyBy(_.remove(descs, null), 'ver'); + + const latestVer = _.head(Object.keys(agentDescs).sort((a, b) => { + const aParts = a.split('.'); + const bParts = b.split('.'); + + for (let i = 0; i < aParts.length; ++i) { + if (aParts[i] !== bParts[i]) + return aParts[i] < bParts[i] ? 1 : -1; + } + + if (aParts.length === bParts.length) + return 0; + + return aParts.length < bParts.length ? 1 : -1; + })); + + // Latest version of agent distribution. + if (latestVer) + agentDescs.current = agentDescs[latestVer]; + + return agentDescs; + }); + } + + getOrCreateCluster(nids) { + const cluster = _.find(this.clusters, (c) => c.same(nids)); + + if (_.isNil(cluster)) + this.clusters.push(new Cluster(nids)); + + return cluster; + } + + /** + * Link agent with browsers by account. + * + * @param {Socket} sock + * @param {Array.<String>} tokens + * @param {boolean} demoEnabled + * + * @private + */ + onConnect(sock, tokens, demoEnabled) { + const agentSocket = new AgentSocket(sock, tokens, demoEnabled); + + sock.on('disconnect', () => { + _.forEach(tokens, (token) => { + _.pull(this._agentSockets.get(token), agentSocket); + + this._browsersHnd.agentStats(token); + }); + }); + + sock.on('cluster:topology', (nids) => { + const cluster = this.getOrCreateCluster(nids); + + if (_.isNil(agentSocket.cluster)) { + agentSocket.cluster = cluster; + + _.forEach(tokens, (token) => { + this._browsersHnd.agentStats(token); + }); + } + else + cluster.updateTopology(nids); + }); + + sock.on('cluster:collector', (top) => { + + }); + + sock.on('cluster:disconnected', () => { + agentSocket.cluster = null; + + _.forEach(tokens, (token) => { + this._browsersHnd.agentStats(token); + }); + }); + + _.forEach(tokens, (token) => { + this._agentSockets.add(token, agentSocket); + + // TODO start demo if needed. + // const browserSockets = _.filter(this._browserSockets[token], 'request._query.IgniteDemoMode'); + // + // // First agent join after user start demo. + // if (_.size(browserSockets)) + // agentSocket.runDemoCluster(token, browserSockets); + + this._browsersHnd.agentStats(token); + }); + + // ioSocket.on('cluster:topology', (top) => { + // + // }); + } + + /** + * @param {http.Server|https.Server} srv Server instance that we want to attach agent handler. + * @param {BrowsersHandler} browsersHnd + */ + attach(srv, browsersHnd) { + this._browsersHnd = browsersHnd; + + if (this.io) + throw 'Agent server already started!'; + + this._collectSupportedAgents() + .then((supportedAgents) => { + this.currentAgent = _.get(supportedAgents, 'current'); + + this.io = socketio(srv, {path: '/agents'}); + + this.io.on('connection', (sock) => { + sock.on('agent:auth', ({ver, bt, tokens, disableDemo}, cb) => { + if (_.isEmpty(tokens)) + return cb('Tokens not set. Please reload agent archive or check settings'); + + if (ver && bt && !_.isEmpty(supportedAgents)) { + const btDistr = _.get(supportedAgents, [ver, 'buildTime']); + + if (_.isEmpty(btDistr) || btDistr < bt) + return cb('You are using an older version of the agent. Please reload agent'); + } + + return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec() + .then((accounts) => { + const activeTokens = _.uniq(_.map(accounts, 'token')); + + if (_.isEmpty(activeTokens)) + return cb(`Failed to authenticate with token(s): ${tokens.join(',')}. Please reload agent archive or check settings`); + + cb(null, activeTokens); + + return this.onConnect(sock, activeTokens, disableDemo); + }) + // TODO IGNITE-1379 send error to web master. + .catch(() => cb(`Invalid token(s): ${tokens.join(',')}. Please reload agent archive or check settings`)); + }); + }); + }); + } + + agent(token, demo, clusterId) { + if (!this.io) + return Promise.reject(new Error('Agent server not started yet!')); + + const socks = this._agentSockets.get(token); + + if (_.isEmpty(socks)) + return Promise.reject(new Error('Failed to find connected agent for this token')); + + if (demo || _.isNil(clusterId)) + return Promise.resolve(_.head(socks)); + + const sock = _.find(socks, (agentSock) => _.get(agentSock, 'cluster.id') === clusterId); + + if (_.isEmpty(sock)) + return Promise.reject(new Error('Failed to find agent connected to cluster')); + + return Promise.resolve(sock); + } + + agents(token) { + if (!this.io) + return Promise.reject(new Error('Agent server not started yet!')); + + const socks = this._agentSockets.get(token); + + if (_.isEmpty(socks)) + return Promise.reject(new Error('Failed to find connected agent for this token')); + + return Promise.resolve(socks); + } + + tryStopDemo(browserSocket) { + const agentSock = this._agentSockets.find(browserSocket); + } + + /** + * @param {ObjectId} token + * @param {Socket} browserSock + * @returns {int} Connected agent count. + */ + onBrowserConnect(token, browserSock) { + this.emitAgentsCount(token); + + // If connect from browser with enabled demo. + const demo = browserSock.request._query.IgniteDemoMode === 'true'; + + // Agents where possible to run demo. + const agentSockets = _.filter(this._agentSockets[token], 'demo.enabled'); + + if (demo && _.size(agentSockets)) { + const agentSocket = _.find(agentSockets, (agent) => _.includes(agent.demo.tokens, token)); + + if (agentSocket) + agentSocket.attachToDemoCluster(browserSock); + else + _.head(agentSockets).runDemoCluster(token, [browserSock]); + } + } + + /** + * @param {Socket} browserSock. + */ + onBrowserDisconnect(browserSock) { + const token = browserSock.client.request.user.token; + + this._browserSockets.pull(token, browserSock); + + // If connect from browser with enabled demo. + if (browserSock.request._query.IgniteDemoMode === 'true') + this._agentSockets.find(token, (agent) => _.includes(agent.demo.browserSockets, browserSock)); + + // TODO If latest browser with demo need stop demo cluster on agent. + } + + /** + * Try stop agent for token if not used by others. + * + * @param {String} token + */ + onTokenReset(token) { + if (_.isNil(this.io)) + return; + + const sockets = this._agentSockets[token]; + + _.forEach(sockets, (socket) => socket._emit('agent:reset:token', token)); + } + } + + return new AgentsHandler(); +}; http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/apiServer.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/apiServer.js b/modules/web-console/backend/app/apiServer.js new file mode 100644 index 0000000..affb9c9 --- /dev/null +++ b/modules/web-console/backend/app/apiServer.js @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +// Fire me up! + +module.exports = { + implements: 'api-server', + inject: ['require(express)', 'configure', 'routes'] +}; + +module.exports.factory = function(Express, configure, routes) { + /** + * Connected agents manager. + */ + class ApiServer { + /** + * @param {Server} srv + */ + attach(srv) { + const app = new Express(); + + configure.express(app); + + routes.register(app); + + // Catch 404 and forward to error handler. + app.use((req, res, next) => { + const err = new Error('Not Found: ' + req.originalUrl); + + err.status = 404; + + next(err); + }); + + // Production error handler: no stacktraces leaked to user. + app.use((err, req, res) => { + res.status(err.status || 500); + + res.render('error', { + message: err.message, + error: {} + }); + }); + + srv.addListener('request', app); + + return app; + } + } + + return new ApiServer(); +}; http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/backend/app/app.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/app.js b/modules/web-console/backend/app/app.js deleted file mode 100644 index eb236e7..0000000 --- a/modules/web-console/backend/app/app.js +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict'; - -// Fire me up! - -module.exports = { - implements: 'app', - inject: ['require(express)', 'configure', 'routes'] -}; - -module.exports.factory = function(Express, configure, routes) { - return { - /** - * @param {Server} srv - */ - listen: (srv) => { - const app = new Express(); - - configure.express(app); - - routes.register(app); - - // Catch 404 and forward to error handler. - app.use((req, res, next) => { - const err = new Error('Not Found: ' + req.originalUrl); - - err.status = 404; - - next(err); - }); - - // Production error handler: no stacktraces leaked to user. - app.use((err, req, res) => { - res.status(err.status || 500); - - res.render('error', { - message: err.message, - error: {} - }); - }); - - srv.addListener('request', app); - - return app; - } - }; -};
