http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java index 58b2102..7c54dc3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java @@ -17,15 +17,23 @@ package org.apache.ignite.internal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; +import junit.framework.AssertionFailedError; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCompute; +import org.apache.ignite.Ignition; +import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -35,6 +43,9 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest { /** */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** */ + private boolean client; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -43,6 +54,8 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest { cfg.setMetricsUpdateFrequency(500); + cfg.setClientMode(client); + return cfg; } @@ -50,32 +63,85 @@ public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testMetrics() throws Exception { - //IgnitionEx.TEST_ZK = false; + int NODES = 6; - Ignite srv0 = startGrids(3); + Ignite srv0 = startGridsMultiThreaded(NODES / 2); - IgniteCompute c1 = srv0.compute(srv0.cluster().forNodeId(nodeId(1))); - IgniteCompute c2 = srv0.compute(srv0.cluster().forNodeId(nodeId(2))); + client = true; - c1.call(new DummyCallable(null)); + startGridsMultiThreaded(NODES / 2, NODES / 2); - Thread.sleep(3000); + Map<UUID, Integer> expJobs = new HashMap<>(); - Ignite srv1 = ignite(0); + for (int i = 0; i < NODES; i++) + expJobs.put(nodeId(i), 0); - System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getAverageCpuLoad()); - System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getAverageCpuLoad()); - System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getAverageCpuLoad()); + checkMetrics(NODES, expJobs); - Thread.sleep(3000); + for (int i = 0; i < NODES; i++) { + UUID nodeId = nodeId(i); - System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getTotalExecutedJobs()); - System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getTotalExecutedJobs()); - System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getTotalExecutedJobs()); + IgniteCompute c = srv0.compute(srv0.cluster().forNodeId(nodeId(i))); + + c.call(new DummyCallable(null)); + + expJobs.put(nodeId, 1); + } } - private UUID nodeId(int nodeIdx) { - return ignite(nodeIdx).cluster().localNode().id(); + /** + * @param expNodes Expected nodes. + * @param expJobs Expected jobs number per node. + */ + private void checkMetrics0(int expNodes, Map<UUID, Integer> expJobs) { + List<Ignite> nodes = Ignition.allGrids(); + + assertEquals(expNodes, nodes.size()); + assertEquals(expNodes, expJobs.size()); + + int totalJobs = 0; + + for (Integer c : expJobs.values()) + totalJobs += c; + + for (final Ignite ignite : nodes) { + ClusterMetrics m = ignite.cluster().metrics(); + + assertEquals(expNodes, m.getTotalNodes()); + assertEquals(totalJobs, m.getTotalExecutedJobs()); + + for (Map.Entry<UUID, Integer> e : expJobs.entrySet()) { + UUID nodeId = e.getKey(); + + ClusterGroup g = ignite.cluster().forNodeId(nodeId); + + ClusterMetrics nodeM = g.metrics(); + + assertEquals(e.getValue(), (Integer)nodeM.getTotalExecutedJobs()); + } + } + } + + /** + * @param expNodes Expected nodes. + * @param expJobs Expected jobs number per node. + * @throws Exception If failed. + */ + private void checkMetrics(final int expNodes, final Map<UUID, Integer> expJobs) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + checkMetrics0(expNodes, expJobs); + } + catch (AssertionFailedError e) { + return false; + } + + return true; + } + }, 5000); + + checkMetrics0(expNodes, expJobs); } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 4ccafa2..2db8c5f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -37,6 +37,7 @@ import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -55,13 +56,19 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.logger.java.JavaLogger; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; import org.apache.zookeeper.ZooKeeper; +import org.jetbrains.annotations.Nullable; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -387,7 +394,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testSegmentation1() throws Exception { - sesTimeout = 1000; + sesTimeout = 2000; testSockNio = true; Ignite node0 = startGrid(0); @@ -608,7 +615,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { blockedC.add(c); - failedZkNodes.add((String)GridTestUtils.getFieldValue(impl, "locNodeZkPath")); + failedZkNodes.add(aliveZkNodePath(spi)); } else { expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1); @@ -617,7 +624,35 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } } - final ZookeeperClient zkClient = new ZookeeperClient(log, zkCluster.getConnectString(), 10_000, null); + waitNoAliveZkNodes(failedZkNodes); + + c0.allowConnect(); + + for (ZkTestClientCnxnSocketNIO c : blockedC) + c.allowConnect(); + + if (expEvts.length > 0) { + for (int i = 0; i < initNodes; i++) + checkEvents(ignite(i), expEvts); + } + + fut.get(); + + waitForTopology(initNodes + startNodes - failCnt); + } + + private static String aliveZkNodePath(Ignite node) { + return aliveZkNodePath(node.configuration().getDiscoverySpi()); + } + + private static String aliveZkNodePath(DiscoverySpi spi) { + String path = GridTestUtils.getFieldValue(spi, "impl", "state", "locNodeZkPath"); + + return path.substring(path.lastIndexOf('/') + 1); + } + + private static void waitNoAliveZkNodes(final List<String> failedZkNodes) throws Exception { + final ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), zkCluster.getConnectString(), 10_000, null); try { assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -643,20 +678,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { finally { zkClient.close(); } - - c0.allowConnect(); - - for (ZkTestClientCnxnSocketNIO c : blockedC) - c.allowConnect(); - - if (expEvts.length > 0) { - for (int i = 0; i < initNodes; i++) - checkEvents(ignite(i), expEvts); - } - - fut.get(); - - waitForTopology(initNodes + startNodes - failCnt); } /** @@ -1062,6 +1083,21 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testClientReconnect1() throws Exception { + startGrid(0); + + sesTimeout = 2000; + testSockNio = true; + client = true; + + Ignite client = startGrid(1); + + reconnectClientNodes(log, Collections.singletonList(client), null); + } + + /** * @param restartZk If {@code true} in background restarts on of ZK servers. * @param closeClientSock If {@code true} in background closes zk clients' sockets. * @throws Exception If failed. @@ -1432,6 +1468,79 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } }, 5000)); } + /** + * Reconnect client node. + * + * @param log Logger. + * @param clients Clients. + * @param disconnectedC Closure which will be run when client node disconnected. + * @throws Exception If failed. + */ + static void reconnectClientNodes(final IgniteLogger log, + List<Ignite> clients, + @Nullable Runnable disconnectedC) + throws Exception { + final CountDownLatch disconnectLatch = new CountDownLatch(clients.size()); + final CountDownLatch reconnectLatch = new CountDownLatch(clients.size()); + + IgnitePredicate<Event> p = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + log.info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }; + + List<String> zkNodes = new ArrayList<>(); + + for (Ignite client : clients) { + client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + zkNodes.add(aliveZkNodePath(client)); + } + + for (Ignite client : clients) + ZkTestClientCnxnSocketNIO.forNode(client.name()).closeSocket(true); + + waitNoAliveZkNodes(zkNodes); + + for (Ignite client : clients) + ZkTestClientCnxnSocketNIO.forNode(client.name()).allowConnect(); + + waitReconnectEvent(log, disconnectLatch); + + if (disconnectedC != null) + disconnectedC.run(); + + waitReconnectEvent(log, reconnectLatch); + + for (Ignite client : clients) + client.events().stopLocalListen(p); + } + + /** + * @param log Logger. + * @param latch Latch. + * @throws Exception If failed. + */ + protected static void waitReconnectEvent(IgniteLogger log, CountDownLatch latch) throws Exception { + if (!latch.await(30_000, MILLISECONDS)) { + log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount()); + + U.dumpThreads(log); + + fail("Failed to wait for disconnect/reconnect event."); + } + } /** * http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 2439117..437ce4d 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1173,6 +1173,14 @@ public abstract class GridAbstractTest extends TestCase { } /** + * @param nodeIdx Node index. + * @return Node ID. + */ + protected final UUID nodeId(int nodeIdx) { + return ignite(nodeIdx).cluster().localNode().id(); + } + + /** * Gets grid for given test. * * @return Grid for given test. http://git-wip-us.apache.org/repos/asf/ignite/blob/1ccbac03/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java index ac3de73..7f641e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.ClusterNodeMetricsSelfTest; +import org.apache.ignite.internal.ClusterNodeMetricsUpdateTest; import org.apache.ignite.internal.GridAffinityNoCacheSelfTest; import org.apache.ignite.internal.GridAffinitySelfTest; import org.apache.ignite.internal.GridAlwaysFailoverSpiFailSelfTest; @@ -120,6 +121,7 @@ public class IgniteComputeGridTestSuite { suite.addTestSuite(GridAlwaysFailoverSpiFailSelfTest.class); suite.addTestSuite(GridTaskInstanceExecutionSelfTest.class); suite.addTestSuite(ClusterNodeMetricsSelfTest.class); + suite.addTestSuite(ClusterNodeMetricsUpdateTest.class); suite.addTestSuite(GridNonHistoryMetricsSelfTest.class); suite.addTestSuite(GridCancelledJobsMetricsSelfTest.class); suite.addTestSuite(GridCollisionJobsContextSelfTest.class);
