zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc085a4a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc085a4a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc085a4a Branch: refs/heads/ignite-zk-ce Commit: fc085a4a018c1f90861a76842f3eebdeee0ba567 Parents: 8101455 Author: sboikov <[email protected]> Authored: Mon Dec 18 10:49:43 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 18 12:50:53 2017 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 4 +- .../communication/GridIoMessageFactory.java | 12 ++++ .../ignite/internal/util/nio/GridNioServer.java | 59 +++++++++++-------- .../TcpCommunicationConnectionCheckFuture.java | 61 ++++++++++++++++---- .../internal/ZkCommunicationErrorNodeState.java | 44 ++++++++++++++ .../ZkCommunicationErrorProcessFuture.java | 41 ++++++++++--- .../ZkCommunicationErrorResolveResult.java | 3 + .../ZkDistributedCollectDataFuture.java | 11 ++++ .../discovery/zk/internal/ZkRuntimeState.java | 5 ++ .../zk/internal/ZookeeperDiscoveryImpl.java | 37 ++++++++++-- .../ZookeeperDiscoverySpiBasicTest.java | 23 +++++--- 11 files changed, 241 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 9c6271a..81f00e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -300,9 +300,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) throws IgniteCheckedException { - assert rmtNodeId != null; - return new DirectMessageReader(msgFactory, U.directProtocolVersion(ctx, rmtNodeId)); + return new DirectMessageReader(msgFactory, + rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : GridIoManager.DIRECT_PROTO_VER); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 78cb7a8..51a6e25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -117,6 +117,8 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; +import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage; +import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; @@ -879,6 +881,16 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 129: + msg = new ClusterMetricsUpdateMessage(); + + break; + + + case 130: + msg = new ContinuousRoutineStartResultMessage(); + + break; // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 9784549..e95f957 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -2301,7 +2301,11 @@ public class GridNioServer<T> { else if (log.isDebugEnabled()) log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']'); - close(ses, new GridNioException(e)); + // Can be null if async connect failed. + if (ses != null) + close(ses, new GridNioException(e)); + else + closeKey(key); } } } @@ -2525,6 +2529,34 @@ public class GridNioServer<T> { } /** + * @param key Key. + */ + private void closeKey(SelectionKey key) { + // Shutdown input and output so that remote client will see correct socket close. + Socket sock = ((SocketChannel)key.channel()).socket(); + + try { + try { + sock.shutdownInput(); + } + catch (IOException ignored) { + // No-op. + } + + try { + sock.shutdownOutput(); + } + catch (IOException ignored) { + // No-op. + } + } + finally { + U.close(key, log); + U.close(sock, log); + } + } + + /** * Closes the session and all associated resources, then notifies the listener. * * @param ses Session to be closed. @@ -2544,8 +2576,6 @@ public class GridNioServer<T> { sessions.remove(ses); workerSessions.remove(ses); - SelectionKey key = ses.key(); - if (ses.setClosed()) { ses.onClosed(); @@ -2557,28 +2587,7 @@ public class GridNioServer<T> { ((DirectBuffer)ses.readBuffer()).cleaner().clean(); } - // Shutdown input and output so that remote client will see correct socket close. - Socket sock = ((SocketChannel)key.channel()).socket(); - - try { - try { - sock.shutdownInput(); - } - catch (IOException ignored) { - // No-op. - } - - try { - sock.shutdownOutput(); - } - catch (IOException ignored) { - // No-op. - } - } - finally { - U.close(key, log); - U.close(sock, log); - } + closeKey(ses.key()); if (e != null) filterChain.onExceptionCaught(ses, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java index 6cb5622..170ee44 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java @@ -111,13 +111,6 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit resBitSet = new BitSet(nodes.size()); } - /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent : evt; - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; - - } - /** * @param timeout Connect timeout. */ @@ -160,6 +153,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i); fut.init(addrs); + + futs[i] = fut; } } else @@ -171,7 +166,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED); if (!isDone()) { - endTime = System.currentTimeMillis() - timeout; + endTime = System.currentTimeMillis() + timeout; spi.getSpiContext().addTimeoutObject(this); } @@ -211,8 +206,30 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit } /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + if (isDone()) + return; + + assert evt instanceof DiscoveryEvent : evt; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; + + UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); + + for (int i = 0; i < nodes.size(); i++) { + if (nodes.get(i).id().equals(nodeId)) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onNodeFailed(); + + return; + } + } + } + + /** {@inheritDoc} */ @Override public void onTimeout() { - if (!isDone()) + if (isDone()) return; ConnectFuture[] futs = this.futs; @@ -230,6 +247,8 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit if (super.onDone(res, err)) { spi.getSpiContext().removeTimeoutObject(this); + spi.getSpiContext().removeLocalEventListener(this); + return true; } @@ -244,6 +263,11 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit * */ void onTimeout(); + + /** + * + */ + void onNodeFailed(); } /** @@ -325,11 +349,16 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit finish(nodeId(nodeIdx).equals(rmtNodeId)); } + /** {@inheritDoc} */ + @Override public void onNodeFailed() { + cancel(); + } + /** * @param res Result. * @return {@code True} if result was set by this call. */ - boolean finish(boolean res) { + public boolean finish(boolean res) { if (connFutDoneUpdater.compareAndSet(this, 0, 1)) { onStatusReceived(res); @@ -369,6 +398,18 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit } /** {@inheritDoc} */ + @Override public void onNodeFailed() { + SingleAddressConnectFuture[] futs = this.futs; + + for (int i = 0; i < futs.length; i++) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onNodeFailed(); + } + } + + /** {@inheritDoc} */ @Override public void onTimeout() { SingleAddressConnectFuture[] futs = this.futs; http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java new file mode 100644 index 0000000..ddc310d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.BitSet; + +/** + * + */ +public class ZkCommunicationErrorNodeState implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final BitSet commState; + + /** */ + private final Exception err; + + /** + * @param commState Communication state. + * @param err Error if failed get communication state.. + */ + ZkCommunicationErrorNodeState(BitSet commState, Exception err) { + this.commState = commState; + this.err = err; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java index d7d4bd1..a6294bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.zk.internal; +import java.util.BitSet; import java.util.Collection; import java.util.List; import java.util.Map; @@ -29,6 +30,8 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -141,19 +144,41 @@ class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> implemen } /** - * @param locNodeOrder Local node order. * @param rtState Runtime state. * @param futPath Future path. * @param nodes Nodes to ping. - * @throws Exception If failed. */ - void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, String futPath, List<ClusterNode> nodes) - throws Exception { - TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi(); - - spi.checkConnection(nodes); + void pingNodesAndNotifyFuture(final ZkRuntimeState rtState, final String futPath, List<ClusterNode> nodes) throws Exception { + final TcpCommunicationSpi spi = (TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi(); + + IgniteFuture<BitSet> fut = spi.checkConnection(nodes); + + fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() { + @Override public void apply(final IgniteFuture<BitSet> fut) { + // Future completed either from NIO thread or timeout worker, save result from another thread. + impl.runInWorkerThread(new ZkRunnable(rtState, impl) { + @Override public void run0() throws Exception { + BitSet commState = null; + Exception err = null; + + try { + commState = fut.get(); + } + catch (Exception e) { + err = e; + } + + ZkCommunicationErrorNodeState state = new ZkCommunicationErrorNodeState(commState, err); + + ZkDistributedCollectDataFuture.saveNodeResult(futPath, + rtState.zkClient, + impl.localNode().order(), + impl.marshalZip(state)); + } + }); - ZkDistributedCollectDataFuture.saveNodeResult(futPath, rtState.zkClient, locNodeOrder, null); + } + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java index 745496b..607f93b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java @@ -31,6 +31,9 @@ class ZkCommunicationErrorResolveResult implements Serializable { /** */ final GridLongList failedNodes; + /** + * @param failedNodes + */ ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) { this.failedNodes = failedNodes; } http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java index e5d2356..19e2acc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java @@ -107,6 +107,17 @@ class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> { } /** + * @param futPath + * @param client + * @param nodeOrder + * @return Node result data. + * @throws Exception If fai.ed + */ + static byte[] readNodeResult(String futPath, ZookeeperClient client, long nodeOrder) throws Exception { + return client.getData(futPath + "/" + nodeOrder); + } + + /** * @param futResPath Result path. * @param client Client. * @param data Result data. http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java index fc03f8d..dc7b1bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java @@ -17,6 +17,8 @@ package org.apache.ignite.spi.discovery.zk.internal; +import java.util.List; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.Watcher; @@ -73,6 +75,9 @@ class ZkRuntimeState { /** */ final ZkClusterNodes top = new ZkClusterNodes(); + /** */ + List<ClusterNode> commErrProcNodes; + /** * @param prevJoined {@code True} if joined topology before reconnect attempt. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 65bf6e7..62fc581 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -2056,6 +2056,8 @@ public class ZookeeperDiscoveryImpl { rtState.evtsData.communicationErrorResolveFutureId(null); + rtState.commErrProcNodes = null; + ZkCommunicationErrorResolveResult res = msg.res; if (res == null) @@ -2120,7 +2122,8 @@ public class ZookeeperDiscoveryImpl { final String futPath = zkPaths.distributedFutureBasePath(msg.id); final ZkCommunicationErrorProcessFuture fut0 = fut; - final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); + + rtState.commErrProcNodes = rtState.top.topologySnapshot(); if (rtState.crd) { ZkDistributedCollectDataFuture nodeResFut = collectCommunicationStatusFuture(msg.id); @@ -2130,7 +2133,7 @@ public class ZookeeperDiscoveryImpl { runInWorkerThread(new ZkRunnable(rtState, this) { @Override protected void run0() throws Exception { - fut0.pingNodesAndNotifyFuture(locNode.order(), rtState, futPath, topSnapshot); + fut0.pingNodesAndNotifyFuture(rtState, futPath, rtState.commErrProcNodes); } }); } @@ -2145,7 +2148,7 @@ public class ZookeeperDiscoveryImpl { new Callable<Void>() { @Override public Void call() throws Exception { // Future is completed from ZK event thread. - onCommunicationResolveStatusReceived(rtState); + onCommunicationErrorResolveStatusReceived(rtState); return null; } @@ -2157,16 +2160,38 @@ public class ZookeeperDiscoveryImpl { * @param rtState Runtime state. * @throws Exception If failed. */ - private void onCommunicationResolveStatusReceived(ZkRuntimeState rtState) throws Exception { + private void onCommunicationErrorResolveStatusReceived(ZkRuntimeState rtState) throws Exception { ZkDiscoveryEventsData evtsData = rtState.evtsData; UUID futId = rtState.evtsData.communicationErrorResolveFutureId(); if (log.isInfoEnabled()) - log.info("Received communication status from all nodes, call resolver [reqId=" + futId + ']'); + log.info("Received communication status from all nodes [reqId=" + futId + ']'); assert futId != null; + String futPath = zkPaths.distributedFutureBasePath(futId); + + List<ClusterNode> initialNodes = rtState.commErrProcNodes; + + assert initialNodes != null; + + rtState.commErrProcNodes = null; + + ZkClusterNodes top = rtState.top; + + List<ZkCommunicationErrorNodeState> nodesRes = new ArrayList<>(); + + for (ZookeeperClusterNode node : top.nodesByOrder.values()) { + byte[] stateBytes = ZkDistributedCollectDataFuture.readNodeResult(futPath, + rtState.zkClient, + node.order()); + + ZkCommunicationErrorNodeState nodeState = unmarshalZip(stateBytes); + + nodesRes.add(nodeState); + } + ZkCommunicationErrorResolveFinishMessage msg = new ZkCommunicationErrorResolveFinishMessage(futId); ZkCommunicationErrorResolveResult res = new ZkCommunicationErrorResolveResult(null); @@ -2663,7 +2688,7 @@ public class ZookeeperDiscoveryImpl { * @return Bytes. * @throws IgniteCheckedException If failed. */ - private byte[] marshalZip(Object obj) throws IgniteCheckedException { + byte[] marshalZip(Object obj) throws IgniteCheckedException { assert obj != null; return U.zip(marsh.marshal(obj)); http://git-wip-us.apache.org/repos/asf/ignite/blob/fc085a4a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index a2e8784..cee2e76 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -1916,20 +1916,27 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * TODO ZK: move to comm spi tests. - * * @throws Exception If failed. */ - public void testNodesPing() throws Exception { - startGrids(3); + public void testConnectionCheck() throws Exception { + final int NODES = 5; + + startGridsMultiThreaded(NODES); + + for (int i = 0; i < NODES; i++) { + Ignite node = ignite(i); + + TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi(); - TcpCommunicationSpi spi = (TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi(); + List<ClusterNode> nodes = new ArrayList<>(); - List<ClusterNode> nodes = new ArrayList<>(); + nodes.addAll(node.cluster().nodes()); - nodes.add(ignite(2).cluster().localNode()); + BitSet res = spi.checkConnection(nodes).get(); - // spi.pingNodes(nodes); + for (int j = 0; j < NODES; j++) + assertTrue(res.get(j)); + } } /**
