IGNITE-5231 Web Console: Add support for Ignite 2.0 cluster on Queries screen.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be012d82 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be012d82 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be012d82 Branch: refs/heads/ignite-5075 Commit: be012d82be955e37ab93ab4b0c17574fae50ce95 Parents: 88593b6 Author: Andrey Novikov <anovi...@gridgain.com> Authored: Tue May 16 18:01:09 2017 +0700 Committer: Andrey Novikov <anovi...@gridgain.com> Committed: Tue May 16 18:01:09 2017 +0700 ---------------------------------------------------------------------- .../web-console/backend/app/agentsHandler.js | 28 +-- .../web-console/backend/app/browsersHandler.js | 24 ++- .../app/modules/agent/AgentManager.service.js | 170 ++++++++++++------- .../frontend/app/modules/sql/sql.controller.js | 2 +- .../frontend/app/primitives/modal/index.scss | 21 +-- modules/web-console/frontend/package.json | 1 + .../frontend/views/includes/header-left.pug | 4 +- .../console/agent/handlers/ClusterListener.java | 90 +++++++--- .../console/agent/handlers/RestListener.java | 7 + .../ignite/console/agent/rest/RestExecutor.java | 19 ++- 10 files changed, 245 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/backend/app/agentsHandler.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/agentsHandler.js b/modules/web-console/backend/app/agentsHandler.js index d24cef8..a4ae385 100644 --- a/modules/web-console/backend/app/agentsHandler.js +++ b/modules/web-console/backend/app/agentsHandler.js @@ -81,7 +81,7 @@ module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, } class Cluster { - constructor(nids) { + constructor(top) { let d = new Date().getTime(); this.id = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => { @@ -92,15 +92,19 @@ module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16); }); - this.nids = nids; + this.nids = top.nids; + + this.clusterVersion = top.clusterVersion; } - same(nids) { - return _.intersection(this.nids, nids).length > 0; + isSameCluster(top) { + return _.intersection(this.nids, top.nids).length > 0; } - updateTopology(nids) { - this.nids = nids; + update(top) { + this.clusterVersion = top.clusterVersion; + + this.nids = top.nids; } } @@ -187,11 +191,11 @@ module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, }); } - getOrCreateCluster(nids) { - const cluster = _.find(this.clusters, (c) => c.same(nids)); + getOrCreateCluster(top) { + const cluster = _.find(this.clusters, (c) => c.isSameCluster(top)); if (_.isNil(cluster)) - this.clusters.push(new Cluster(nids)); + this.clusters.push(new Cluster(top)); return cluster; } @@ -216,8 +220,8 @@ module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, }); }); - sock.on('cluster:topology', (nids) => { - const cluster = this.getOrCreateCluster(nids); + sock.on('cluster:topology', (top) => { + const cluster = this.getOrCreateCluster(top); if (_.isNil(agentSocket.cluster)) { agentSocket.cluster = cluster; @@ -227,7 +231,7 @@ module.exports.factory = function(_, fs, path, JSZip, socketio, settings, mongo, }); } else - cluster.updateTopology(nids); + cluster.update(top); }); sock.on('cluster:collector', (top) => { http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/backend/app/browsersHandler.js ---------------------------------------------------------------------- diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js index 66ac5f8..f5937fe 100644 --- a/modules/web-console/backend/app/browsersHandler.js +++ b/modules/web-console/backend/app/browsersHandler.js @@ -114,11 +114,8 @@ module.exports.factory = (_, socketio, configure, errors) => { acc.count += 1; acc.hasDemo |= _.get(agentSock, 'demo.enabled'); - if (agentSock.cluster) { - acc.clusters.push({ - id: agentSock.cluster.id - }); - } + if (agentSock.cluster) + acc.clusters.push(agentSock.cluster); return acc; }, {count: 0, hasDemo: false, clusters: []}); @@ -199,10 +196,19 @@ module.exports.factory = (_, socketio, configure, errors) => { const internalVisor = (postfix) => `org.apache.ignite.internal.visor.${postfix}`; - this.registerVisorTask('querySql', internalVisor('query.VisorQueryTask'), internalVisor('query.VisorQueryTaskArg')); - this.registerVisorTask('queryScan', internalVisor('query.VisorScanQueryTask'), internalVisor('query.VisorScanQueryTaskArg')); - this.registerVisorTask('queryFetch', internalVisor('query.VisorQueryNextPageTask'), internalVisor('query.VisorQueryNextPageTaskArg')); - this.registerVisorTask('queryClose', internalVisor('query.VisorQueryCleanupTask'), internalVisor('query.VisorQueryCleanupTaskArg')); + this.registerVisorTask('querySql', internalVisor('query.VisorQueryTask'), internalVisor('query.VisorQueryArg')); + this.registerVisorTask('querySqlV2', internalVisor('query.VisorQueryTask'), internalVisor('query.VisorQueryArgV2')); + this.registerVisorTask('querySqlV3', internalVisor('query.VisorQueryTask'), internalVisor('query.VisorQueryArgV3')); + this.registerVisorTask('querySqlX2', internalVisor('query.VisorQueryTask'), internalVisor('query.VisorQueryTaskArg')); + + this.registerVisorTask('queryScanX2', internalVisor('query.VisorScanQueryTask'), internalVisor('query.VisorScanQueryTaskArg')); + + this.registerVisorTask('queryFetch', internalVisor('query.VisorQueryNextPageTask'), 'org.apache.ignite.lang.IgniteBiTuple', 'java.lang.String', 'java.lang.Integer'); + this.registerVisorTask('queryFetchX2', internalVisor('query.VisorQueryNextPageTask'), internalVisor('query.VisorQueryNextPageTaskArg')); + + this.registerVisorTask('queryClose', internalVisor('query.VisorQueryCleanupTask'), 'java.util.Map', 'java.util.UUID', 'java.util.Set'); + this.registerVisorTask('queryCloseX2', internalVisor('query.VisorQueryCleanupTask'), internalVisor('query.VisorQueryCleanupTaskArg')); + // Return command result from grid to browser. sock.on('node:visor', (clusterId, taskId, nids, ...args) => { http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/frontend/app/modules/agent/AgentManager.service.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js index cb77832..c511242 100644 --- a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js +++ b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js @@ -17,6 +17,8 @@ import io from 'socket.io-client'; // eslint-disable-line no-unused-vars +import { BehaviorSubject } from 'rxjs/BehaviorSubject'; + const maskNull = (val) => _.isNil(val) ? 'null' : val; const State = { @@ -39,17 +41,26 @@ export default class IgniteAgentManager { */ this.AgentModal = AgentModal; - this.clusters = []; + this.promises = new Set(); $root.$on('$stateChangeSuccess', () => this.stopWatch()); + this.ignite2x = false; + + $root.$watch(() => _.get(this, 'cluster.clusterVersion'), (ver) => { + if (_.isEmpty(ver)) + return; + + this.ignite2x = ver.startsWith('2.'); + }, true); + /** * Connection to backend. * @type {Socket} */ this.socket = null; - this.connectionState = State.INIT; + this.connectionState = new BehaviorSubject(State.INIT); /** * Has agent with enabled demo mode. @@ -111,11 +122,11 @@ export default class IgniteAgentManager { } if (count === 0) - self.connectionState = State.AGENT_DISCONNECTED; - else { - self.connectionState = self.$root.IgniteDemoMode || _.get(self.cluster, 'disconnect') === false ? - State.CONNECTED : State.CLUSTER_DISCONNECTED; - } + self.connectionState.next(State.AGENT_DISCONNECTED); + else if (self.$root.IgniteDemoMode || _.get(self.cluster, 'disconnect') === false) + self.connectionState.next(State.CONNECTED); + else + self.connectionState.next(State.CLUSTER_DISCONNECTED); }); } @@ -132,17 +143,23 @@ export default class IgniteAgentManager { * @returns {Promise} */ awaitConnectionState(...states) { - this.latchAwaitStates = this.$q.defer(); + const defer = this.$q.defer(); - this.offAwaitAgent = this.$root.$watch(() => this.connectionState, (state) => { - if (_.includes(states, state)) { - this.offAwaitAgent(); + this.promises.add(defer); - this.latchAwaitStates.resolve(); + const subscription = this.connectionState.subscribe({ + next: (state) => { + if (_.includes(states, state)) + defer.resolve(); } }); - return this.latchAwaitStates.promise; + return defer.promise + .finally(() => { + subscription.unsubscribe(); + + this.promises.delete(defer); + }); } awaitCluster() { @@ -167,24 +184,26 @@ export default class IgniteAgentManager { if (_.nonEmpty(self.clusters) && _.get(self.cluster, 'disconnect') === true) { self.cluster = _.head(self.clusters); - self.connectionState = State.CONNECTED; + self.connectionState.next(State.CONNECTED); } - self.offStateWatch = this.$root.$watch(() => self.connectionState, (state) => { - switch (state) { - case State.CONNECTED: - case State.CLUSTER_DISCONNECTED: - this.AgentModal.hide(); + self.modalSubscription = this.connectionState.subscribe({ + next: (state) => { + switch (state) { + case State.CONNECTED: + case State.CLUSTER_DISCONNECTED: + this.AgentModal.hide(); - break; + break; - case State.AGENT_DISCONNECTED: - this.AgentModal.agentDisconnected(self.backText, self.backState); + case State.AGENT_DISCONNECTED: + this.AgentModal.agentDisconnected(self.backText, self.backState); - break; + break; - default: + default: // Connection to backend is not established yet. + } } }); @@ -205,28 +224,30 @@ export default class IgniteAgentManager { if (_.nonEmpty(self.clusters) && _.get(self.cluster, 'disconnect') === true) { self.cluster = _.head(self.clusters); - self.connectionState = State.CONNECTED; + self.connectionState.next(State.CONNECTED); } - self.offStateWatch = this.$root.$watch(() => self.connectionState, (state) => { - switch (state) { - case State.CONNECTED: - this.AgentModal.hide(); + self.modalSubscription = this.connectionState.subscribe({ + next: (state) => { + switch (state) { + case State.CONNECTED: + this.AgentModal.hide(); - break; + break; - case State.AGENT_DISCONNECTED: - this.AgentModal.agentDisconnected(self.backText, self.backState); + case State.AGENT_DISCONNECTED: + this.AgentModal.agentDisconnected(self.backText, self.backState); - break; + break; - case State.CLUSTER_DISCONNECTED: - self.AgentModal.clusterDisconnected(self.backText, self.backState); + case State.CLUSTER_DISCONNECTED: + self.AgentModal.clusterDisconnected(self.backText, self.backState); - break; + break; - default: + default: // Connection to backend is not established yet. + } } }); @@ -234,18 +255,11 @@ export default class IgniteAgentManager { } stopWatch() { - if (!_.isFunction(this.offStateWatch)) - return; - - this.offStateWatch(); + this.modalSubscription && this.modalSubscription.unsubscribe(); this.AgentModal.hide(); - if (this.latchAwaitStates) { - this.offAwaitAgent(); - - this.latchAwaitStates.reject('Agent watch stopped.'); - } + this.promises.forEach((promise) => promise.reject('Agent watch stopped.')); } /** @@ -460,12 +474,33 @@ export default class IgniteAgentManager { * @returns {Promise} */ querySql(nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz) { - return this.visorTask('querySql', nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz) - .then(({error, result}) => { - if (_.isEmpty(error)) - return result; + if (this.ignite2x) { + return this.visorTask('querySqlX2', nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz) + .then(({error, result}) => { + if (_.isEmpty(error)) + return result; + + return Promise.reject(error); + }); + } + + cacheName = _.isEmpty(cacheName) ? null : cacheName; + + let queryPromise; - return Promise.reject(error); + if (enforceJoinOrder) + queryPromise = this.visorTask('querySqlV3', nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, local, pageSz); + else if (nonCollocatedJoins) + queryPromise = this.visorTask('querySqlV2', nid, cacheName, query, nonCollocatedJoins, local, pageSz); + else + queryPromise = this.visorTask('querySql', nid, cacheName, query, local, pageSz); + + return queryPromise + .then(({key, value}) => { + if (_.isEmpty(key)) + return value; + + return Promise.reject(key); }); } @@ -517,8 +552,12 @@ export default class IgniteAgentManager { * @returns {Promise} */ queryClose(nid, queryId) { - return this.visorTask('queryClose', nid, 'java.util.Map', 'java.util.UUID', 'java.util.Collection', - nid + '=' + queryId); + if (this.ignite2x) { + return this.visorTask('queryClose', nid, 'java.util.Map', 'java.util.UUID', 'java.util.Collection', + nid + '=' + queryId); + } + + return this.visorTask('queryClose', nid, queryId); } /** @@ -533,13 +572,26 @@ export default class IgniteAgentManager { * @returns {Promise} */ queryScan(nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize) { - return this.visorTask('queryScan', nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize) - .then(({error, result}) => { - if (_.isEmpty(error)) - return result; + if (this.ignite2x) { + return this.visorTask('queryScanX2', nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize) + .then(({error, result}) => { + if (_.isEmpty(error)) + return result; - return Promise.reject(error); - }); + return Promise.reject(error); + }); + } + + /** Prefix for node local key for SCAN near queries. */ + const SCAN_CACHE_WITH_FILTER = 'VISOR_SCAN_CACHE_WITH_FILTER'; + + /** Prefix for node local key for SCAN near queries. */ + const SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE = 'VISOR_SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE'; + + const prefix = caseSensitive ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE : SCAN_CACHE_WITH_FILTER; + const query = `${prefix}${filter}`; + + return this.querySql(nid, cacheName, query, false, false, false, local, pageSize); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/frontend/app/modules/sql/sql.controller.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/modules/sql/sql.controller.js b/modules/web-console/frontend/app/modules/sql/sql.controller.js index 3806351..cf9f917 100644 --- a/modules/web-console/frontend/app/modules/sql/sql.controller.js +++ b/modules/web-console/frontend/app/modules/sql/sql.controller.js @@ -1628,7 +1628,7 @@ export default ['$rootScope', '$scope', '$http', '$q', '$timeout', '$interval', return Promise.resolve(args.localNid || _chooseNode(args.cacheName, false)) .then((nid) => args.type === 'SCAN' - ? agentMgr.queryScanGetAll(nid, args.cacheName, args.filter, !!args.regEx, !!args.caseSensitive, !!args.near, !!args.localNid) + ? agentMgr.queryScanGetAll(nid, args.cacheName, args.query, !!args.regEx, !!args.caseSensitive, !!args.near, !!args.localNid) : agentMgr.querySqlGetAll(nid, args.cacheName, args.query, !!args.nonCollocatedJoins, !!args.enforceJoinOrder, false, !!args.localNid)) .then((res) => _export(paragraph.name + '-all.csv', paragraph.gridOptions.columnDefs, res.columns, res.rows)) .catch(Messages.showError) http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/frontend/app/primitives/modal/index.scss ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/primitives/modal/index.scss b/modules/web-console/frontend/app/primitives/modal/index.scss index dc0dfee..c1c1fbf 100644 --- a/modules/web-console/frontend/app/primitives/modal/index.scss +++ b/modules/web-console/frontend/app/primitives/modal/index.scss @@ -32,18 +32,18 @@ .modal-header { border-top-left-radius: 6px; border-top-right-radius: 6px; -} -// Close icon -.modal-header .close { - margin-right: -2px; -} + // Close icon + .close { + margin-right: -2px; + } -// Modal icon -.modal-header h4 > i.fa { - cursor: default; - float: left; - line-height: $modal-title-line-height; + // Modal icon + h4 > i { + cursor: default; + float: left; + line-height: $modal-title-line-height; + } } .modal .modal-dialog { @@ -95,6 +95,7 @@ .modal-body-with-scroll { max-height: 420px; + overflow-y: auto; overflow-y: overlay; margin: 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/frontend/package.json ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/package.json b/modules/web-console/frontend/package.json index fccc944..43cf0c9 100644 --- a/modules/web-console/frontend/package.json +++ b/modules/web-console/frontend/package.json @@ -60,6 +60,7 @@ "nvd3": "1.8.4", "raleway-webfont": "3.0.1", "roboto-font": "0.1.0", + "rxjs": "5.4.0", "socket.io-client": "1.7.3", "ui-router-metatags": "1.0.3" }, http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/frontend/views/includes/header-left.pug ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/views/includes/header-left.pug b/modules/web-console/frontend/views/includes/header-left.pug index 7578705..4dda1cc 100644 --- a/modules/web-console/frontend/views/includes/header-left.pug +++ b/modules/web-console/frontend/views/includes/header-left.pug @@ -27,8 +27,8 @@ span.caret .wch-nav-item(ng-controller='notebookController') - div(ng-if='IgniteDemoMode' ng-class='{active: $state.includes("base.sql")}') - a(ui-sref='base.sql.demo') Queries + div(ng-if='IgniteDemoMode' ui-sref='base.sql.demo' ng-class='{active: $state.includes("base.sql")}') + span Queries div(ng-if='!IgniteDemoMode') div(ng-if='!notebooks.length' ng-class='{active: $state.includes("base.sql")}') http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java index 939b35a..27f5317 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java @@ -37,10 +37,12 @@ import org.apache.ignite.internal.processors.rest.protocols.http.jetty.GridJetty import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.ignite.console.agent.AgentUtils.toJSON; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER; import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_SUCCESS; /** @@ -65,8 +67,8 @@ public class ClusterListener { /** JSON object mapper. */ private static final ObjectMapper mapper = new GridJettyObjectMapper(); - /** Nids. */ - private Collection<UUID> latestNids = Collections.emptyList(); + /** Latest topology snapshot. */ + private TopologySnapshot top; /** */ private final WatchTask watchTask = new WatchTask(); @@ -132,14 +134,14 @@ public class ClusterListener { * Callback on disconnect from cluster. */ private void clusterDisconnect() { - if (latestNids.isEmpty()) + if (top == null) return; - - latestNids = Collections.emptyList(); + + top = null; log.info("Connection to cluster was lost"); - client.emit(EVENT_CLUSTER_DISCONNECTED, latestNids); + client.emit(EVENT_CLUSTER_DISCONNECTED); } /** @@ -188,30 +190,68 @@ public class ClusterListener { } /** */ + private class TopologySnapshot { + /** */ + private Collection<UUID> nids; + + /** */ + private String clusterVersion; + + /** + * @param nodes Nodes. + */ + TopologySnapshot(Collection<GridClientNodeBean> nodes) { + nids = F.viewReadOnly(nodes, NODE2ID); + + Collection<IgniteProductVersion> vers = F.transform(nodes, + new IgniteClosure<GridClientNodeBean, IgniteProductVersion>() { + @Override public IgniteProductVersion apply(GridClientNodeBean bean) { + return IgniteProductVersion.fromString((String)bean.getAttributes().get(ATTR_BUILD_VER)); + } + }); + + clusterVersion = Collections.min(vers).toString(); + } + + /** */ + Collection<String> nid8() { + return F.viewReadOnly(nids, ID2ID8); + } + + /** */ + boolean isSameCluster(TopologySnapshot snapshot) { + if (snapshot == null || F.isEmpty(snapshot.nids)) + return false; + + return Collections.disjoint(nids, snapshot.nids); + } + } + + /** */ private class WatchTask implements Runnable { /** {@inheritDoc} */ @Override public void run() { try { - RestResult top = restExecutor.topology(false, false); + RestResult res = restExecutor.topology(false, false); - switch (top.getStatus()) { + switch (res.getStatus()) { case STATUS_SUCCESS: - List<GridClientNodeBean> nodes = mapper.readValue(top.getData(), + List<GridClientNodeBean> nodes = mapper.readValue(res.getData(), new TypeReference<List<GridClientNodeBean>>() {}); - Collection<UUID> nids = F.viewReadOnly(nodes, NODE2ID); + TopologySnapshot newTop = new TopologySnapshot(nodes); - if (Collections.disjoint(latestNids, nids)) - log.info("Connection successfully established to cluster with nodes: {}", F.viewReadOnly(nids, ID2ID8)); + if (newTop.isSameCluster(top)) + log.info("Connection successfully established to cluster with nodes: {}", newTop.nid8()); - client.emit(EVENT_CLUSTER_TOPOLOGY, nids); + top = newTop; - latestNids = nids; + client.emit(EVENT_CLUSTER_TOPOLOGY, toJSON(top)); break; default: - log.warn(top.getError()); + log.warn(res.getError()); clusterDisconnect(); } @@ -227,31 +267,31 @@ public class ClusterListener { /** {@inheritDoc} */ @Override public void run() { try { - RestResult top = restExecutor.topology(false, true); + RestResult res = restExecutor.topology(false, true); - switch (top.getStatus()) { + switch (res.getStatus()) { case STATUS_SUCCESS: - List<GridClientNodeBean> nodes = mapper.readValue(top.getData(), + List<GridClientNodeBean> nodes = mapper.readValue(res.getData(), new TypeReference<List<GridClientNodeBean>>() {}); - Collection<UUID> nids = F.viewReadOnly(nodes, NODE2ID); - - if (Collections.disjoint(latestNids, nids)) { - clusterConnect(nids); + TopologySnapshot newTop = new TopologySnapshot(nodes); + if (top == null || top.isSameCluster(newTop)) { clusterDisconnect(); + log.info("Connection successfully established to cluster with nodes: {}", newTop.nid8()); + watch(); } - latestNids = nids; + top = newTop; - client.emit(EVENT_CLUSTER_TOPOLOGY, top.getData()); + client.emit(EVENT_CLUSTER_TOPOLOGY, res.getData()); break; default: - log.warn(top.getError()); + log.warn(res.getError()); clusterDisconnect(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java index 2588e8e..c70514d 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java @@ -18,6 +18,8 @@ package org.apache.ignite.console.agent.handlers; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.ignite.console.agent.rest.RestExecutor; /** @@ -35,6 +37,11 @@ public class RestListener extends AbstractListener { } /** {@inheritDoc} */ + @Override protected ExecutorService newThreadPool() { + return Executors.newCachedThreadPool(); + } + + /** {@inheritDoc} */ @Override public Object execute(Map<String, Object> args) throws Exception { if (log.isDebugEnabled()) log.debug("Start parse REST command args: " + args); http://git-wip-us.apache.org/repos/asf/ignite/blob/be012d82/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java index 904b2b8..bfeef58 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.net.ConnectException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; +import okhttp3.Dispatcher; import okhttp3.FormBody; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -60,15 +62,26 @@ public class RestExecutor { public RestExecutor(String nodeUrl) { this.nodeUrl = nodeUrl; - httpClient = new OkHttpClient.Builder().build(); + Dispatcher dispatcher = new Dispatcher(); + + dispatcher.setMaxRequests(Integer.MAX_VALUE); + dispatcher.setMaxRequestsPerHost(Integer.MAX_VALUE); + + httpClient = new OkHttpClient.Builder() + .readTimeout(0, TimeUnit.MILLISECONDS) + .dispatcher(dispatcher) + .build(); } /** * Stop HTTP client. */ public void stop() { - if (httpClient != null) + if (httpClient != null) { httpClient.dispatcher().executorService().shutdown(); + + httpClient.dispatcher().cancelAll(); + } } /** */ @@ -189,7 +202,7 @@ public class RestExecutor { Map<String, Object> params = new HashMap<>(3); params.put("cmd", "top"); - params.put("attr", full); + params.put("attr", true); params.put("mtr", full); return sendRequest(demo, "ignite", params, "GET", null, null);