Repository: ignite Updated Branches: refs/heads/ignite-zk a6b452823 -> 6cb8c06f7
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6cb8c06f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6cb8c06f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6cb8c06f Branch: refs/heads/ignite-zk Commit: 6cb8c06f73ace3030f47ccfe21e2f46d6b054e5f Parents: a6b4528 Author: sboikov <sboi...@gridgain.com> Authored: Wed Nov 22 13:28:51 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Nov 22 13:39:39 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZookeeperDiscoveryImpl.java | 16 +- .../zk/ZookeeperDiscoverySpiBasicTest.java | 911 ------------------ .../ZookeeperDiscoverySpiBasicTest.java | 942 +++++++++++++++++++ 3 files changed, 947 insertions(+), 922 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6cb8c06f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 5e9c5a3..11c6a6e 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -322,20 +322,10 @@ public class ZookeeperDiscoveryImpl { dirs.add(zkPaths.evtsPath); dirs.add(zkPaths.joinDataDir); dirs.add(zkPaths.customEvtsDir); + dirs.add(zkPaths.customEvtsAcksDir); dirs.add(zkPaths.aliveNodesDir); zkClient.createAllIfNeeded(dirs, PERSISTENT); -// zkClient.createIfNeeded(zkPaths.basePath, null, PERSISTENT); -// -// zkClient.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT); -// -// zkClient.createIfNeeded(zkPaths.evtsPath, null, PERSISTENT); -// -// zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT); -// -// zkClient.createIfNeeded(zkPaths.customEvtsDir, null, PERSISTENT); -// -// zkClient.createIfNeeded(zkPaths.aliveNodesDir, null, PERSISTENT); } catch (ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); @@ -821,6 +811,10 @@ public class ZookeeperDiscoveryImpl { zkClient.deleteAll(zkPaths.customEvtsDir, zkClient.getChildren(zkPaths.customEvtsDir), -1); + + zkClient.deleteAll(zkPaths.customEvtsAcksDir, + zkClient.getChildren(zkPaths.customEvtsDir), + -1); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6cb8c06f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java deleted file mode 100644 index d579c08..0000000 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ /dev/null @@ -1,911 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.curator.test.TestingCluster; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.IgnitionEx; -import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteCallable; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; -import org.apache.zookeeper.ZooKeeper; - -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD; -import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; - -/** - * - */ -public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { - /** */ - private TestingCluster zkCluster; - - /** */ - private static final boolean USE_TEST_CLUSTER = true; - - /** */ - private boolean client; - - /** */ - private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = new ConcurrentHashMap<>(); - - /** */ - private static volatile boolean err; - - /** */ - private boolean testSockNio; - - /** */ - private int sesTimeout; - - /** */ - private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>(); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - if (testSockNio) - System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName()); - - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setConsistentId(igniteInstanceName); - - ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); - - zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000); - - spis.put(igniteInstanceName, zkSpi); - - if (USE_TEST_CLUSTER) { - assert zkCluster != null; - - zkSpi.setZkConnectionString(zkCluster.getConnectString()); - } - else - zkSpi.setZkConnectionString("localhost:2181"); - - cfg.setDiscoverySpi(zkSpi); - - CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); - - ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - - cfg.setCacheConfiguration(ccfg); - - // cfg.setMarshaller(new JdkMarshaller()); - - cfg.setClientMode(client); - - Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); - - lsnrs.put(new IgnitePredicate<Event>() { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - @Override public boolean apply(Event evt) { - try { - DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; - - UUID locId = ignite.cluster().localNode().id(); - - Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId); - - if (nodeEvts == null) { - Object old = evts.put(locId, nodeEvts = new TreeMap<>()); - - assertNull(old); - - synchronized (nodeEvts) { - DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin(); - - nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); - } - } - - synchronized (nodeEvts) { - DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); - - assertNull(old); - } - } - catch (Throwable e) { - err = true; - - info("Unexpected error: " + e); - } - - return true; - } - }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT}); - - cfg.setLocalEventListeners(lsnrs); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD); - - super.afterTestsStopped(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - if (USE_TEST_CLUSTER) { - zkCluster = new TestingCluster(1); - zkCluster.start(); - } - - reset(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - if (zkCluster != null) { - try { - zkCluster.close(); - } - catch (Exception e) { - U.error(log, "Failed to stop Zookeeper client: " + e, e); - } - - zkCluster = null; - } - - super.afterTest(); - - try { - assertFalse("Unexpected error, see log for details", err); - - checkEventsConsistency(); - } - finally { - reset(); - } - - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testStopNode_1() throws Exception { - startGrids(5); - - waitForTopology(5); - - stopGrid(3); - - waitForTopology(4); - - startGrid(3); - - waitForTopology(5); - } - - /** - * @throws Exception If failed. - */ - public void testCustomEventsSimple1_SingleNode() throws Exception { - Ignite srv0 = startGrid(0); - - srv0.createCache(new CacheConfiguration<>("c1")); - - waitForEventsAcks(srv0); - } - - /** - * @throws Exception If failed. - */ - public void testCustomEventsSimple1_5_Nodes() throws Exception { - Ignite srv0 = startGrids(5); - - srv0.createCache(new CacheConfiguration<>("c1")); - - awaitPartitionMapExchange(); - - waitForEventsAcks(srv0); - } - - /** - * @throws Exception If failed. - */ - public void testSegmentation1() throws Exception { - sesTimeout = 1000; - testSockNio = true; - - Ignite node0 = startGrid(0); - - final CountDownLatch l = new CountDownLatch(1); - - node0.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event event) { - l.countDown(); - - return false; - } - }, EventType.EVT_NODE_SEGMENTED); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(true); - - for (int i = 0; i < 10; i++) { - Thread.sleep(1_000); - - if (l.getCount() == 0) - break; - } - - info("Allow connect"); - - c0.allowConnect(); - - assertTrue(l.await(10, TimeUnit.SECONDS)); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore1() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGrid(1); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore2() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGridsMultiThreaded(1, 5); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_NonCoordinator1() throws Exception { - connectionRestore_NonCoordinator(false); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_NonCoordinator2() throws Exception { - connectionRestore_NonCoordinator(true); - } - - /** - * @param failWhenDisconnected {@code True} if fail node while another node is disconnected. - * @throws Exception If failed. - */ - private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - Ignite node1 = startGrid(1); - - ZkTestClientCnxnSocketNIO c1 = ZkTestClientCnxnSocketNIO.forNode(node1); - - c1.closeSocket(true); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - try { - startGrid(2); - } - catch (Exception e) { - info("Start error: " + e); - } - - return null; - } - }, "start-node"); - - checkEvents(node0, joinEvent(3)); - - if (failWhenDisconnected) { - ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2)); - - closeZkClient(spi); - - checkEvents(node0, failEvent(4)); - } - - c1.allowConnect(); - - checkEvents(ignite(1), joinEvent(3)); - - if (failWhenDisconnected) { - checkEvents(ignite(1), failEvent(4)); - - IgnitionEx.stop(getTestIgniteInstanceName(2), true, true); - } - - fut.get(); - - waitForTopology(failWhenDisconnected ? 2 : 3); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator1() throws Exception { - connectionRestore_Coordinator(1, 1, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator1_1() throws Exception { - connectionRestore_Coordinator(1, 1, 1); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator2() throws Exception { - connectionRestore_Coordinator(1, 3, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator3() throws Exception { - connectionRestore_Coordinator(3, 3, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator4() throws Exception { - connectionRestore_Coordinator(3, 3, 1); - } - - /** - * @param initNodes Number of initially started nodes. - * @param startNodes Number of nodes to start after coordinator loose connection. - * @param failCnt Number of nodes to stop after coordinator loose connection. - * @throws Exception If failed. - */ - private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception { - sesTimeout = 30_000; - testSockNio = true; - - Ignite node0 = startGrids(initNodes); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(true); - - final AtomicInteger nodeIdx = new AtomicInteger(initNodes); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - try { - startGrid(nodeIdx.getAndIncrement()); - } - catch (Exception e) { - error("Start failed: " + e); - } - - return null; - } - }, startNodes, "start-node"); - - int cnt = 0; - - DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt]; - - int expEvtCnt = 0; - - sesTimeout = 1000; - - List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>(); - - for (int i = initNodes; i < initNodes + startNodes; i++) { - ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); - - ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, "impl"); - - impl.waitConnectStart(); - - if (cnt++ < failCnt) { - ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i)); - - c.closeSocket(true); - - blockedC.add(c); - } - else { - expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1); - - expEvtCnt++; - } - } - - Thread.sleep(5000); - - c0.allowConnect(); - - for (ZkTestClientCnxnSocketNIO c : blockedC) - c.allowConnect(); - - if (expEvts.length > 0) { - for (int i = 0; i < initNodes; i++) - checkEvents(ignite(i), expEvts); - } - - fut.get(); - - waitForTopology(initNodes + startNodes - failCnt); - } - - /** - * @throws Exception If failed. - */ - public void testClusterRestart() throws Exception { - startGridsMultiThreaded(3, false); - - stopAllGrids(); - - evts.clear(); - - startGridsMultiThreaded(3, false); - - waitForTopology(3); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore4() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGrid(1); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_1_Node() throws Exception { - startGrid(0); - - waitForTopology(1); - - stopGrid(0); - } - - /** - * @throws Exception If failed. - */ - public void testRestarts_2_Nodes() throws Exception { - startGrid(0); - - for (int i = 0; i < 10; i++) { - info("Iteration: " + i); - - startGrid(1); - - waitForTopology(2); - - stopGrid(1); - } - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_2_Nodes_WithCache() throws Exception { - startGrids(2); - - for (Ignite node : G.allGrids()) { - IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); - - assertNotNull(cache); - - for (int i = 0; i < 100; i++) { - cache.put(i, node.name()); - - assertEquals(node.name(), cache.get(i)); - } - } - - awaitPartitionMapExchange(); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_2_Nodes() throws Exception { - startGrid(0); - - waitForTopology(1); - - startGrid(1); - - waitForTopology(2); - - for (Ignite node : G.allGrids()) - node.compute().broadcast(new DummyCallable(null)); - - awaitPartitionMapExchange(); - - waitForEventsAcks(ignite(0)); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop1() throws Exception { - startGridsMultiThreaded(5, false); - - waitForTopology(5); - - awaitPartitionMapExchange(); - - waitForEventsAcks(ignite(0)); - - stopGrid(0); - - waitForTopology(4); - - for (Ignite node : G.allGrids()) - node.compute().broadcast(new DummyCallable(null)); - - startGrid(0); - - waitForTopology(5); - - awaitPartitionMapExchange(); - - waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes()))); - } - - /** - * @param node Node. - * @throws Exception If failed. - */ - private void waitForEventsAcks(final Ignite node) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - Map<Object, Object> evts = GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(), - "impl", "evtsData", "evts"); - - if (!evts.isEmpty()) { - info("Unacked events: " + evts); - - return false; - } - - return true; - } - }, 10_000)); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop2() throws Exception { - startGridsMultiThreaded(10, false); - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer idx) { - stopGrid(idx); - } - }, 3, "stop-node-thread"); - - waitForTopology(7); - - startGridsMultiThreaded(0, 3); - - waitForTopology(10); - } - - /** - * @throws Exception If failed. - */ - public void testStartStopWithClients() throws Exception { - final int SRVS = 3; - - startGrids(SRVS); - - client = true; - - final int THREADS = 30; - - for (int i = 0; i < 5; i++) { - info("Iteration: " + i); - - startGridsMultiThreaded(SRVS, THREADS); - - waitForTopology(SRVS + THREADS); - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer idx) { - stopGrid(idx + SRVS); - } - }, THREADS, "stop-node"); - - waitForTopology(SRVS); - - checkEventsConsistency(); - } - } - - /** - * - */ - private void reset() { - System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); - - ZkTestClientCnxnSocketNIO.reset(); - - System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); - - err = false; - - evts.clear(); - } - - /** - * @throws Exception If failed. - */ - private void checkEventsConsistency() throws Exception { - for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { - UUID nodeId = nodeEvtEntry.getKey(); - Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue(); - - for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) { - if (!nodeId.equals(nodeEvtEntry0.getKey())) { - Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); - - synchronized (nodeEvts) { - synchronized (nodeEvts0) { - checkEventsConsistency(nodeEvts, nodeEvts0); - } - } - } - } - } - } - - /** - * @param evts1 Received events. - * @param evts2 Received events. - */ - private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, Map<Long, DiscoveryEvent> evts2) { - for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) { - DiscoveryEvent evt1 = e1.getValue(); - DiscoveryEvent evt2 = evts2.get(e1.getKey()); - - if (evt2 != null) { - assertEquals(evt1.topologyVersion(), evt2.topologyVersion()); - assertEquals(evt1.eventNode(), evt2.eventNode()); - assertEquals(evt1.topologyNodes(), evt2.topologyNodes()); - } - } - } - - /** - * @param nodeName Node name. - * @return Node's discovery SPI. - * @throws Exception If failed. - */ - private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception { - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return spis.contains(nodeName); - } - }, 5000); - - ZookeeperDiscoverySpi spi = spis.get(nodeName); - - assertNotNull("Failed to get SPI for node: " + nodeName, spi); - - return spi; - } - - private static DiscoveryEvent joinEvent(long topVer) { - DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null); - - expEvt.topologySnapshot(topVer, null); - - return expEvt; - } - - private static DiscoveryEvent failEvent(long topVer) { - DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null); - - expEvt.topologySnapshot(topVer, null); - - return expEvt; - } - - /** - * @param node Node. - * @param expEvts Expected events. - * @throws Exception If fialed. - */ - private void checkEvents(final Ignite node, final DiscoveryEvent...expEvts) throws Exception { - checkEvents(node.cluster().localNode().id(), expEvts); - } - - /** - * @param nodeId Node ID. - * @param expEvts Expected events. - * @throws Exception If failed. - */ - private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId); - - if (nodeEvts == null) { - info("No events for node: " + nodeId); - - return false; - } - - synchronized (nodeEvts) { - for (DiscoveryEvent expEvt : expEvts) { - DiscoveryEvent evt0 = nodeEvts.get(expEvt.topologyVersion()); - - if (evt0 == null) { - info("No event for version: " + expEvt.topologyVersion()); - - return false; - } - - assertEquals(expEvt.type(), evt0.type()); - } - } - - return true; - } - }, 10000)); - } - - /** - * @param spi Spi instance. - */ - private void closeZkClient(ZookeeperDiscoverySpi spi) { - ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "zkClient", "zk"); - - try { - zk.close(); - } - catch (Exception e) { - fail("Unexpected error: " + e); - } - } - - /** - * @param expSize Expected nodes number. - * @throws Exception If failed. - */ - private void waitForTopology(final int expSize) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - List<Ignite> nodes = G.allGrids(); - - if (nodes.size() != expSize) { - info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']'); - - return false; - } - - for (Ignite node: nodes) { - int sizeOnNode = node.cluster().nodes().size(); - - if (sizeOnNode != expSize) { - info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']'); - - return false; - } - } - - return true; - } - }, 5000)); - } - - /** - * - */ - private static class DummyCallable implements IgniteCallable<Object> { - /** */ - private byte[] data; - - /** - * @param data Data. - */ - DummyCallable(byte[] data) { - this.data = data; - } - - /** {@inheritDoc} */ - @Override public Object call() throws Exception { - return data; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/6cb8c06f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java new file mode 100644 index 0000000..e9e8f68 --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -0,0 +1,942 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.curator.test.TestingCluster; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; +import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient; +import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; +import org.apache.zookeeper.ZooKeeper; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD; +import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; + +/** + * + */ +public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { + /** */ + private TestingCluster zkCluster; + + /** */ + private static final boolean USE_TEST_CLUSTER = true; + + /** */ + private boolean client; + + /** */ + private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = new ConcurrentHashMap<>(); + + /** */ + private static volatile boolean err; + + /** */ + private boolean testSockNio; + + /** */ + private int sesTimeout; + + /** */ + private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + if (testSockNio) + System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName()); + + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); + + zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000); + + spis.put(igniteInstanceName, zkSpi); + + if (USE_TEST_CLUSTER) { + assert zkCluster != null; + + zkSpi.setZkConnectionString(zkCluster.getConnectString()); + } + else + zkSpi.setZkConnectionString("localhost:2181"); + + cfg.setDiscoverySpi(zkSpi); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + // cfg.setMarshaller(new JdkMarshaller()); + + cfg.setClientMode(client); + + Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); + + lsnrs.put(new IgnitePredicate<Event>() { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + @Override public boolean apply(Event evt) { + try { + DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; + + UUID locId = ignite.cluster().localNode().id(); + + Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId); + + if (nodeEvts == null) { + Object old = evts.put(locId, nodeEvts = new TreeMap<>()); + + assertNull(old); + + synchronized (nodeEvts) { + DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin(); + + nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); + } + } + + synchronized (nodeEvts) { + DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); + + assertNull(old); + } + } + catch (Throwable e) { + err = true; + + info("Unexpected error: " + e); + } + + return true; + } + }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT}); + + cfg.setLocalEventListeners(lsnrs); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + if (USE_TEST_CLUSTER) { + zkCluster = new TestingCluster(1); + zkCluster.start(); + } + + reset(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + if (zkCluster != null) { + try { + zkCluster.close(); + } + catch (Exception e) { + U.error(log, "Failed to stop Zookeeper client: " + e, e); + } + + zkCluster = null; + } + + super.afterTest(); + + try { + assertFalse("Unexpected error, see log for details", err); + + checkEventsConsistency(); + } + finally { + reset(); + } + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testStopNode_1() throws Exception { + startGrids(5); + + waitForTopology(5); + + stopGrid(3); + + waitForTopology(4); + + startGrid(3); + + waitForTopology(5); + } + + /** + * @throws Exception If failed. + */ + public void testCustomEventsSimple1_SingleNode() throws Exception { + Ignite srv0 = startGrid(0); + + srv0.createCache(new CacheConfiguration<>("c1")); + + waitForEventsAcks(srv0); + } + + /** + * @throws Exception If failed. + */ + public void testCustomEventsSimple1_5_Nodes() throws Exception { + Ignite srv0 = startGrids(5); + + srv0.createCache(new CacheConfiguration<>("c1")); + + awaitPartitionMapExchange(); + + waitForEventsAcks(srv0); + } + + /** + * @throws Exception If failed. + */ + public void testSegmentation1() throws Exception { + sesTimeout = 1000; + testSockNio = true; + + Ignite node0 = startGrid(0); + + final CountDownLatch l = new CountDownLatch(1); + + node0.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + l.countDown(); + + return false; + } + }, EventType.EVT_NODE_SEGMENTED); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(true); + + for (int i = 0; i < 10; i++) { + Thread.sleep(1_000); + + if (l.getCount() == 0) + break; + } + + info("Allow connect"); + + c0.allowConnect(); + + assertTrue(l.await(10, TimeUnit.SECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore1() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(false); + + startGrid(1); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore2() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(false); + + startGridsMultiThreaded(1, 5); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_NonCoordinator1() throws Exception { + connectionRestore_NonCoordinator(false); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_NonCoordinator2() throws Exception { + connectionRestore_NonCoordinator(true); + } + + /** + * @param failWhenDisconnected {@code True} if fail node while another node is disconnected. + * @throws Exception If failed. + */ + private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + Ignite node1 = startGrid(1); + + ZkTestClientCnxnSocketNIO c1 = ZkTestClientCnxnSocketNIO.forNode(node1); + + c1.closeSocket(true); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + startGrid(2); + } + catch (Exception e) { + info("Start error: " + e); + } + + return null; + } + }, "start-node"); + + checkEvents(node0, joinEvent(3)); + + if (failWhenDisconnected) { + ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2)); + + closeZkClient(spi); + + checkEvents(node0, failEvent(4)); + } + + c1.allowConnect(); + + checkEvents(ignite(1), joinEvent(3)); + + if (failWhenDisconnected) { + checkEvents(ignite(1), failEvent(4)); + + IgnitionEx.stop(getTestIgniteInstanceName(2), true, true); + } + + fut.get(); + + waitForTopology(failWhenDisconnected ? 2 : 3); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator1() throws Exception { + connectionRestore_Coordinator(1, 1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator1_1() throws Exception { + connectionRestore_Coordinator(1, 1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator2() throws Exception { + connectionRestore_Coordinator(1, 3, 0); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator3() throws Exception { + connectionRestore_Coordinator(3, 3, 0); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore_Coordinator4() throws Exception { + connectionRestore_Coordinator(3, 3, 1); + } + + /** + * @param initNodes Number of initially started nodes. + * @param startNodes Number of nodes to start after coordinator loose connection. + * @param failCnt Number of nodes to stop after coordinator loose connection. + * @throws Exception If failed. + */ + private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception { + sesTimeout = 30_000; + testSockNio = true; + + Ignite node0 = startGrids(initNodes); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(true); + + final AtomicInteger nodeIdx = new AtomicInteger(initNodes); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + startGrid(nodeIdx.getAndIncrement()); + } + catch (Exception e) { + error("Start failed: " + e); + } + + return null; + } + }, startNodes, "start-node"); + + int cnt = 0; + + DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt]; + + int expEvtCnt = 0; + + sesTimeout = 1000; + + List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>(); + + final List<String> failedZkNodes = new ArrayList<>(failCnt); + + for (int i = initNodes; i < initNodes + startNodes; i++) { + ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); + + ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, "impl"); + + impl.waitConnectStart(); + + if (cnt++ < failCnt) { + ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i)); + + c.closeSocket(true); + + blockedC.add(c); + + failedZkNodes.add((String)GridTestUtils.getFieldValue(impl, "locNodeZkPath")); + } + else { + expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1); + + expEvtCnt++; + } + } + + final ZookeeperClient zkClient = new ZookeeperClient(log, zkCluster.getConnectString(), 10_000, null); + + try { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + List<String> c = zkClient.getChildren("/apacheIgnite/default/alive"); + + for (String failedZkNode : failedZkNodes) { + if (c.contains(failedZkNode)) + return false; + } + + return true; + } + catch (Exception e) { + fail(); + + return true; + } + } + }, 10_000)); + } + finally { + zkClient.close(); + } + + c0.allowConnect(); + + for (ZkTestClientCnxnSocketNIO c : blockedC) + c.allowConnect(); + + if (expEvts.length > 0) { + for (int i = 0; i < initNodes; i++) + checkEvents(ignite(i), expEvts); + } + + fut.get(); + + waitForTopology(initNodes + startNodes - failCnt); + } + + /** + * @throws Exception If failed. + */ + public void testClusterRestart() throws Exception { + startGridsMultiThreaded(3, false); + + stopAllGrids(); + + evts.clear(); + + startGridsMultiThreaded(3, false); + + waitForTopology(3); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRestore4() throws Exception { + testSockNio = true; + + Ignite node0 = startGrid(0); + + ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + + c0.closeSocket(false); + + startGrid(1); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop_1_Node() throws Exception { + startGrid(0); + + waitForTopology(1); + + stopGrid(0); + } + + /** + * @throws Exception If failed. + */ + public void testRestarts_2_Nodes() throws Exception { + startGrid(0); + + for (int i = 0; i < 10; i++) { + info("Iteration: " + i); + + startGrid(1); + + waitForTopology(2); + + stopGrid(1); + } + } + + /** + * @throws Exception If failed. + */ + public void testStartStop_2_Nodes_WithCache() throws Exception { + startGrids(2); + + for (Ignite node : G.allGrids()) { + IgniteCache cache = node.cache(DEFAULT_CACHE_NAME); + + assertNotNull(cache); + + for (int i = 0; i < 100; i++) { + cache.put(i, node.name()); + + assertEquals(node.name(), cache.get(i)); + } + } + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop_2_Nodes() throws Exception { + startGrid(0); + + waitForTopology(1); + + startGrid(1); + + waitForTopology(2); + + for (Ignite node : G.allGrids()) + node.compute().broadcast(new DummyCallable(null)); + + awaitPartitionMapExchange(); + + waitForEventsAcks(ignite(0)); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop1() throws Exception { + startGridsMultiThreaded(5, false); + + waitForTopology(5); + + awaitPartitionMapExchange(); + + waitForEventsAcks(ignite(0)); + + stopGrid(0); + + waitForTopology(4); + + for (Ignite node : G.allGrids()) + node.compute().broadcast(new DummyCallable(null)); + + startGrid(0); + + waitForTopology(5); + + awaitPartitionMapExchange(); + + waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes()))); + } + + /** + * @param node Node. + * @throws Exception If failed. + */ + private void waitForEventsAcks(final Ignite node) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Map<Object, Object> evts = GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(), + "impl", "evtsData", "evts"); + + if (!evts.isEmpty()) { + info("Unacked events: " + evts); + + return false; + } + + return true; + } + }, 10_000)); + } + + /** + * @throws Exception If failed. + */ + public void testStartStop2() throws Exception { + startGridsMultiThreaded(10, false); + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + stopGrid(idx); + } + }, 3, "stop-node-thread"); + + waitForTopology(7); + + startGridsMultiThreaded(0, 3); + + waitForTopology(10); + } + + /** + * @throws Exception If failed. + */ + public void testStartStopWithClients() throws Exception { + final int SRVS = 3; + + startGrids(SRVS); + + client = true; + + final int THREADS = 30; + + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + + startGridsMultiThreaded(SRVS, THREADS); + + waitForTopology(SRVS + THREADS); + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + stopGrid(idx + SRVS); + } + }, THREADS, "stop-node"); + + waitForTopology(SRVS); + + checkEventsConsistency(); + } + } + + /** + * + */ + private void reset() { + System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); + + ZkTestClientCnxnSocketNIO.reset(); + + System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); + + err = false; + + evts.clear(); + } + + /** + * @throws Exception If failed. + */ + private void checkEventsConsistency() throws Exception { + for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { + UUID nodeId = nodeEvtEntry.getKey(); + Map<Long, DiscoveryEvent> nodeEvts = nodeEvtEntry.getValue(); + + for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry0 : evts.entrySet()) { + if (!nodeId.equals(nodeEvtEntry0.getKey())) { + Map<Long, DiscoveryEvent> nodeEvts0 = nodeEvtEntry0.getValue(); + + synchronized (nodeEvts) { + synchronized (nodeEvts0) { + checkEventsConsistency(nodeEvts, nodeEvts0); + } + } + } + } + } + } + + /** + * @param evts1 Received events. + * @param evts2 Received events. + */ + private void checkEventsConsistency(Map<Long, DiscoveryEvent> evts1, Map<Long, DiscoveryEvent> evts2) { + for (Map.Entry<Long, DiscoveryEvent> e1 : evts1.entrySet()) { + DiscoveryEvent evt1 = e1.getValue(); + DiscoveryEvent evt2 = evts2.get(e1.getKey()); + + if (evt2 != null) { + assertEquals(evt1.topologyVersion(), evt2.topologyVersion()); + assertEquals(evt1.eventNode(), evt2.eventNode()); + assertEquals(evt1.topologyNodes(), evt2.topologyNodes()); + } + } + } + + /** + * @param nodeName Node name. + * @return Node's discovery SPI. + * @throws Exception If failed. + */ + private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return spis.contains(nodeName); + } + }, 5000); + + ZookeeperDiscoverySpi spi = spis.get(nodeName); + + assertNotNull("Failed to get SPI for node: " + nodeName, spi); + + return spi; + } + + private static DiscoveryEvent joinEvent(long topVer) { + DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_JOINED, null); + + expEvt.topologySnapshot(topVer, null); + + return expEvt; + } + + private static DiscoveryEvent failEvent(long topVer) { + DiscoveryEvent expEvt = new DiscoveryEvent(null, null, EventType.EVT_NODE_FAILED, null); + + expEvt.topologySnapshot(topVer, null); + + return expEvt; + } + + /** + * @param node Node. + * @param expEvts Expected events. + * @throws Exception If fialed. + */ + private void checkEvents(final Ignite node, final DiscoveryEvent...expEvts) throws Exception { + checkEvents(node.cluster().localNode().id(), expEvts); + } + + /** + * @param nodeId Node ID. + * @param expEvts Expected events. + * @throws Exception If failed. + */ + private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId); + + if (nodeEvts == null) { + info("No events for node: " + nodeId); + + return false; + } + + synchronized (nodeEvts) { + for (DiscoveryEvent expEvt : expEvts) { + DiscoveryEvent evt0 = nodeEvts.get(expEvt.topologyVersion()); + + if (evt0 == null) { + info("No event for version: " + expEvt.topologyVersion()); + + return false; + } + + assertEquals(expEvt.type(), evt0.type()); + } + } + + return true; + } + }, 10000)); + } + + /** + * @param spi Spi instance. + */ + private void closeZkClient(ZookeeperDiscoverySpi spi) { + ZooKeeper zk = GridTestUtils.getFieldValue(spi, "impl", "zkClient", "zk"); + + try { + zk.close(); + } + catch (Exception e) { + fail("Unexpected error: " + e); + } + } + + /** + * @param expSize Expected nodes number. + * @throws Exception If failed. + */ + private void waitForTopology(final int expSize) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + List<Ignite> nodes = G.allGrids(); + + if (nodes.size() != expSize) { + info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']'); + + return false; + } + + for (Ignite node: nodes) { + int sizeOnNode = node.cluster().nodes().size(); + + if (sizeOnNode != expSize) { + info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']'); + + return false; + } + } + + return true; + } + }, 5000)); + } + + /** + * + */ + private static class DummyCallable implements IgniteCallable<Object> { + /** */ + private byte[] data; + + /** + * @param data Data. + */ + DummyCallable(byte[] data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return data; + } + } +}