This is an automated email from the ASF dual-hosted git repository. namelchev pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6f3c277b446 IGNITE-11650 Fixed flaky TcpCommunicationSpiFreezingClientTest test (#9975) 6f3c277b446 is described below commit 6f3c277b4460aec15c33c76f92fdb33d6430370e Author: Nikita Amelchev <nsamelc...@gmail.com> AuthorDate: Thu Apr 14 18:50:02 2022 +0300 IGNITE-11650 Fixed flaky TcpCommunicationSpiFreezingClientTest test (#9975) --- .../ignite/internal/util/IgniteDevOnlyLogTest.java | 12 +- .../tcp/TcpCommunicationSpiFreezingClientTest.java | 202 ++++++++++----------- .../testframework/junits/GridAbstractTest.java | 2 +- 3 files changed, 99 insertions(+), 117 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteDevOnlyLogTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteDevOnlyLogTest.java index 0dd792f8bad..b939771c9fb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteDevOnlyLogTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteDevOnlyLogTest.java @@ -62,9 +62,9 @@ public class IgniteDevOnlyLogTest extends GridCommonAbstractTest { public void testDevOnlyQuietMessage() throws Exception { additionalArgs = Collections.singletonList("-D" + IgniteSystemProperties.IGNITE_QUIET + "=true"); - log = new GridStringLogger(false, grid(0).log()); + GridStringLogger log = new GridStringLogger(false, grid(0).log()); - Ignite ignite = startGrid(1); + Ignite ignite = startGrid(optimize(getConfiguration(getTestIgniteInstanceName(1)).setGridLogger(log))); String msg = getMessage(ignite); @@ -78,9 +78,9 @@ public class IgniteDevOnlyLogTest extends GridCommonAbstractTest { public void testDevOnlyVerboseMessage() throws Exception { additionalArgs = Collections.singletonList("-D" + IgniteSystemProperties.IGNITE_QUIET + "=false"); - log = new GridStringLogger(false, grid(0).log()); + GridStringLogger log = new GridStringLogger(false, grid(0).log()); - Ignite ignite = startGrid(1); + Ignite ignite = startGrid(optimize(getConfiguration(getTestIgniteInstanceName(1)).setGridLogger(log))); String msg = getMessage(ignite); @@ -99,9 +99,9 @@ public class IgniteDevOnlyLogTest extends GridCommonAbstractTest { additionalArgs = Collections.singletonList("-D" + IgniteSystemProperties.IGNITE_DEV_ONLY_LOGGING_DISABLED + "=true"); - log = new GridStringLogger(false, grid(0).log()); + GridStringLogger log = new GridStringLogger(false, grid(0).log()); - Ignite ignite = startGrid(1); + Ignite ignite = startGrid(optimize(getConfiguration(getTestIgniteInstanceName(1)).setGridLogger(log))); String msg = getMessage(ignite); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java index 1e32a0a8acd..c81a7016a5f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java @@ -17,78 +17,65 @@ package org.apache.ignite.spi.communication.tcp; -import java.lang.management.ManagementFactory; -import java.util.Iterator; -import javax.cache.Cache; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterTopologyException; -import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.lang.IgniteCallable; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.nio.GridCommunicationClient; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.junits.GridAbstractTest; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * Tests that freezing due to JVM STW client will be failed if connection can't be established. */ +@WithSystemProperty(key = IGNITE_ENABLE_FORCIBLE_NODE_KILL, value = "true") public class TcpCommunicationSpiFreezingClientTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** Message to catch GC start on a client. */ + private static final String GC_START_MSG = "Try to start GC."; + + /** Last GC start time. */ + private final AtomicLong lastGC = new AtomicLong(Long.MAX_VALUE); /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - cfg.setFailureDetectionTimeout(120000); - cfg.setClientFailureDetectionTimeout(120000); + cfg.setFailureDetectionTimeout(getTestTimeout()); + cfg.setClientFailureDetectionTimeout(getTestTimeout()); TcpCommunicationSpi spi = new TcpCommunicationSpi(); - spi.setConnectTimeout(3000); - spi.setMaxConnectTimeout(6000); - spi.setReconnectCount(3); + spi.setConnectTimeout(1000); + spi.setMaxConnectTimeout(1000); spi.setIdleConnectionTimeout(100); spi.setSharedMemoryPort(-1); - TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - cfg.setCommunicationSpi(spi); - cfg.setDiscoverySpi(discoSpi); - - cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).setWriteSynchronizationMode(FULL_SYNC). - setCacheMode(PARTITIONED).setAtomicityMode(ATOMIC)); - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); + ListeningTestLogger log = new ListeningTestLogger(GridAbstractTest.log); - System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true"); - } + log.registerListener((s) -> { + if (s.contains(GC_START_MSG)) + lastGC.set(System.currentTimeMillis()); + }); - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); + cfg.setGridLogger(log); - System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL); + return cfg; } /** {@inheritDoc} */ @@ -96,91 +83,86 @@ public class TcpCommunicationSpiFreezingClientTest extends GridCommonAbstractTes return true; } - /** - * @throws Exception If failed. - */ + /** @throws Exception If failed. */ @Test public void testFreezingClient() throws Exception { - try { - final IgniteEx srv = startGrids(2); + Ignite srv = startGrid(0); + Ignite client = startClientGrid("client"); + IgniteCompute compute = srv.compute(srv.cluster().forNode(client.cluster().localNode())).withNoFailover(); - final IgniteEx client = startClientGrid(3); + // Close communication connections by idle and trigger STW on the client. + compute.runAsync(() -> { + waitConnectionsClosed(Ignition.localIgnite()); - final int keysCnt = 100_000; + triggerSTW(); + }); - try (IgniteDataStreamer<Integer, byte[]> streamer = srv.dataStreamer(DEFAULT_CACHE_NAME)) { - for (int i = 0; i < keysCnt; i++) - streamer.addData(i, new byte[512]); - } - - // Wait for connections go idle. - doSleep(1000); + while (!Thread.interrupted()) { + // Make sure connections closed on the server. + waitConnectionsClosed(srv); - srv.compute(srv.cluster().forNode(client.localNode())).withNoFailover().call(new ClientClosure()); + // Make sure that the client is freezed by STW. + assertTrue(waitForCondition(() -> System.currentTimeMillis() - lastGC.get() > 1000, getTestTimeout())); - fail("Client node must be kicked from topology"); + // Open new connection to the freezed client. Retry if client has completed GC and was not freezed. + try { + compute.run(() -> {}); + } + catch (ClusterTopologyException ignored) { + break; + } } - catch (ClusterTopologyException e) { - // Expected. - e.printStackTrace(); - - System.out.println(e); - } - finally { - stopAllGrids(); - } + assertEquals(1, srv.cluster().nodes().size()); } - /** */ - public static class ClientClosure implements IgniteCallable<Integer> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @IgniteInstanceResource - private transient Ignite ignite; - - /** */ - @LoggerResource - private IgniteLogger log; - - /** {@inheritDoc} */ - @Override public Integer call() throws Exception { - Thread loadThread = new Thread(() -> log.info("result = " + simulateLoad())); - - loadThread.setName("load-thread"); - loadThread.start(); - - int cnt = 0; + /** Triggers STW. */ + private void triggerSTW() { + long end = System.currentTimeMillis() + getTestTimeout(); - final Iterator<Cache.Entry<Integer, byte[]>> it = ignite.cache(DEFAULT_CACHE_NAME). - query(new ScanQuery<Integer, byte[]>().setPageSize(100000)).iterator(); + while (!Thread.interrupted() && (System.currentTimeMillis() < end)) { + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(this::simulateLoad); - while (it.hasNext()) { - Cache.Entry<Integer, byte[]> entry = it.next(); + while (!fut.isDone()) { + System.out.println(GC_START_MSG); - // Trigger STW. - final long[] tids = ManagementFactory.getThreadMXBean().findDeadlockedThreads(); - - cnt++; + GridTestUtils.runGC(); } + } + } - loadThread.join(); + /** Simulate load without safepoints to block GC. */ + public double simulateLoad() { + double d = 0; - return cnt; - } + for (int i = 0; i < Integer.MAX_VALUE; i++) + d += Math.log(Math.PI * i); - /** - * - */ - public static double simulateLoad() { - double d = 0; + return d; + } - for (int i = 0; i < 1000000000; i++) - d += Math.log(Math.PI * i); + /** Waits for all communication connections closed by idle. */ + private void waitConnectionsClosed(Ignite node) { + TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi(); + Map<UUID, GridCommunicationClient[]> clientsMap = GridTestUtils.getFieldValue(spi, "clientPool", "clients"); - return d; + try { + assertTrue(waitForCondition(() -> { + for (GridCommunicationClient[] clients : clientsMap.values()) { + if (clients == null) + continue; + + for (GridCommunicationClient client : clients) { + if (client != null) + return false; + } + } + + return true; + }, getTestTimeout())); + } + catch (IgniteInterruptedCheckedException e) { + throw U.convertException(e); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 615b9adcb6c..b895ab6a6ed 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1409,7 +1409,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware { } } - return new IgniteProcessProxy(cfg, log, (x) -> grid(0), resetDiscovery, additionalRemoteJvmArgs()); + return new IgniteProcessProxy(cfg, cfg.getGridLogger(), (x) -> grid(0), resetDiscovery, additionalRemoteJvmArgs()); } /**