This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 969f21a3aa8 IGNITE-28041 Fixed flaky ClientSlowDiscoveryAbstractTest 
(#12845)
969f21a3aa8 is described below

commit 969f21a3aa877ee970d9e77750b031596ded1926
Author: Mikhail Petrov <[email protected]>
AuthorDate: Thu Mar 5 14:59:10 2026 +0300

    IGNITE-28041 Fixed flaky ClientSlowDiscoveryAbstractTest (#12845)
---
 .../cache/ClientSlowDiscoveryAbstractTest.java     | 66 +++++++--------
 .../ClientSlowDiscoveryTopologyChangeTest.java     | 99 ++++++++++------------
 .../ClientSlowDiscoveryTransactionRemapTest.java   | 40 +++------
 3 files changed, 93 insertions(+), 112 deletions(-)

diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
index 7b437382a48..d81ef052420 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.function.Supplier;
-import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -26,52 +24,49 @@ import 
org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.spi.communication.CommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-/**
- *
- */
-public class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/** */
+abstract class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
     /** Cache name. */
     protected static final String CACHE_NAME = "cache";
 
-    /** Cache configuration. */
-    private final CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
-        .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
-        .setReadFromBackup(false)
-        .setBackups(1)
-        .setAffinity(new RendezvousAffinityFunction(false, 64));
-
-    /** Communication SPI supplier. */
-    protected Supplier<CommunicationSpi> communicationSpiSupplier = 
TestRecordingCommunicationSpi::new;
-
-    /** Discovery SPI supplier. */
-    protected Supplier<DiscoverySpi> discoverySpiSupplier = 
TcpDiscoverySpi::new;
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME)
+                .setAtomicityMode(TRANSACTIONAL)
+                .setReadFromBackup(false)
+                .setBackups(1)
+                .setAffinity(new RendezvousAffinityFunction(false, 64)));
+    }
+
+    /** */
+    protected IgniteConfiguration getConfiguration(int nodeIdx, 
TcpDiscoverySpi discoSpi) throws Exception {
+        IgniteConfiguration cfg = 
getConfiguration(getTestIgniteInstanceName(nodeIdx));
 
-        cfg.setConsistentId(igniteInstanceName);
-        cfg.setCacheConfiguration(ccfg);
-        cfg.setCommunicationSpi(communicationSpiSupplier.get());
-        cfg.setDiscoverySpi(discoverySpiSupplier.get());
+        
cfg.setDiscoverySpi(discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
 
         return cfg;
     }
 
-    /**
-     *
-     */
+    /** */
     static class NodeJoinInterceptingDiscoverySpi extends TcpDiscoverySpi {
         /** Interceptor. */
-        protected volatile IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> 
interceptor;
+        private final IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> 
interceptor;
+
+        /** */
+        
NodeJoinInterceptingDiscoverySpi(IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage>
 interceptor) {
+            this.interceptor = interceptor;
+        }
 
         /** {@inheritDoc} */
         @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
@@ -80,12 +75,15 @@ public class ClientSlowDiscoveryAbstractTest extends 
GridCommonAbstractTest {
         }
     }
 
-    /**
-     *
-     */
+    /** */
     static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi 
{
         /** Interceptor. */
-        protected volatile IgniteInClosure<DiscoveryCustomMessage> interceptor;
+        private final IgniteInClosure<DiscoveryCustomMessage> interceptor;
+
+        /** */
+        
CustomMessageInterceptingDiscoverySpi(IgniteInClosure<DiscoveryCustomMessage> 
interceptor) {
+            this.interceptor = interceptor;
+        }
 
         /** {@inheritDoc} */
         @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
index 6a3c0214096..a202bf89a7e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -28,31 +29,22 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
-/**
- *
- */
+/** */
 public class ClientSlowDiscoveryTopologyChangeTest extends 
ClientSlowDiscoveryAbstractTest {
-    /**
-     *
-     */
-    @Before
-    public void before() throws Exception {
+    /** {@inheritDoc} */
+    @Override public void beforeTest() throws Exception {
         stopAllGrids();
 
         cleanPersistenceDir();
     }
 
-    /**
-     *
-     */
-    @After
-    public void after() throws Exception {
+    /** {@inheritDoc} */
+    @Override public void afterTest() throws Exception {
         stopAllGrids();
 
         cleanPersistenceDir();
@@ -75,53 +67,25 @@ public class ClientSlowDiscoveryTopologyChangeTest extends 
ClientSlowDiscoveryAb
         for (int k = 0; k < 64; k++)
             crd.cache(CACHE_NAME).put(k, k);
 
-        TestRecordingCommunicationSpi clientCommSpi = new 
TestRecordingCommunicationSpi();
-
-        // Delay client join process.
-        clientCommSpi.blockMessages((node, msg) -> {
-            if (!(msg instanceof GridDhtPartitionsSingleMessage))
-                return false;
+        CountDownLatch cliDiscoSpiUnblockedLatch = new CountDownLatch(1);
 
-            GridDhtPartitionsSingleMessage singleMsg = 
(GridDhtPartitionsSingleMessage)msg;
-
-            return Optional.ofNullable(singleMsg.exchangeId())
-                .map(GridDhtPartitionExchangeId::topologyVersion)
-                .filter(topVer -> topVer.equals(new AffinityTopologyVersion(4, 
0)))
-                .isPresent();
-        });
-
-        communicationSpiSupplier = () -> clientCommSpi;
-
-        CustomMessageInterceptingDiscoverySpi clientDiscoSpi = new 
CustomMessageInterceptingDiscoverySpi();
-
-        CountDownLatch clientDiscoSpiBlock = new CountDownLatch(1);
-
-        // Delay cache destroying on client node.
-        clientDiscoSpi.interceptor = (msg) -> {
-            if (!(msg instanceof DynamicCacheChangeBatch))
-                return;
-
-            DynamicCacheChangeBatch cacheChangeBatch = 
(DynamicCacheChangeBatch)msg;
-
-            boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
-                .anyMatch(req -> req.stop() && 
req.cacheName().equals(CACHE_NAME));
+        IgniteConfiguration cliCfg = getConfiguration(3, 
createBlockingDiscoverySpi(cliDiscoSpiUnblockedLatch));
 
-            if (hasCacheStopReq)
-                U.awaitQuiet(clientDiscoSpiBlock);
-        };
+        TestRecordingCommunicationSpi commSpi = 
(TestRecordingCommunicationSpi)cliCfg.getCommunicationSpi();
 
-        discoverySpiSupplier = () -> clientDiscoSpi;
+        // Delay client join process.
+        blockSingleMessage(commSpi);
 
-        IgniteInternalFuture<IgniteEx> clientStartFut = 
GridTestUtils.runAsync(() -> startClientGrid(3));
+        IgniteInternalFuture<IgniteEx> clientStartFut = 
GridTestUtils.runAsync(() -> startClientGrid(cliCfg));
 
         // Wait till client node starts join process.
-        clientCommSpi.waitForBlocked();
+        commSpi.waitForBlocked();
 
         // Destroy cache on server nodes.
         crd.destroyCache(CACHE_NAME);
 
         // Resume client join.
-        clientCommSpi.stopBlock();
+        commSpi.stopBlock();
 
         // Client join should succeed.
         IgniteEx client = clientStartFut.get();
@@ -143,7 +107,7 @@ public class ClientSlowDiscoveryTopologyChangeTest extends 
ClientSlowDiscoveryAb
         }
         finally {
             // Resume processing cache destroy on client node.
-            clientDiscoSpiBlock.countDown();
+            cliDiscoSpiUnblockedLatch.countDown();
         }
 
         // Wait till cache destroyed on client node.
@@ -157,4 +121,35 @@ public class ClientSlowDiscoveryTopologyChangeTest extends 
ClientSlowDiscoveryAb
 
         Assert.assertNull("Cache should be destroyed on client node", 
client.cache(CACHE_NAME));
     }
+
+    /** */
+    private TcpDiscoverySpi createBlockingDiscoverySpi(CountDownLatch 
discoSpiUnblockedLatch) {
+        return new CustomMessageInterceptingDiscoverySpi(msg -> {
+            if (!(msg instanceof DynamicCacheChangeBatch))
+                return;
+
+            DynamicCacheChangeBatch cacheChangeBatch = 
(DynamicCacheChangeBatch)msg;
+
+            boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
+                .anyMatch(req -> req.stop() && 
req.cacheName().equals(CACHE_NAME));
+
+            if (hasCacheStopReq)
+                U.awaitQuiet(discoSpiUnblockedLatch);
+        });
+    }
+
+    /** */
+    private void blockSingleMessage(TestRecordingCommunicationSpi commSpi) {
+        commSpi.blockMessages((node, msg) -> {
+            if (!(msg instanceof GridDhtPartitionsSingleMessage))
+                return false;
+
+            GridDhtPartitionsSingleMessage singleMsg = 
(GridDhtPartitionsSingleMessage)msg;
+
+            return Optional.ofNullable(singleMsg.exchangeId())
+                .map(GridDhtPartitionExchangeId::topologyVersion)
+                .filter(topVer -> topVer.equals(new AffinityTopologyVersion(4, 
0)))
+                .isPresent();
+        });
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
index d408333bc91..b1c1958f61a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
@@ -34,15 +34,12 @@ import 
org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionTimeoutException;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -204,9 +201,9 @@ public class ClientSlowDiscoveryTransactionRemapTest 
extends ClientSlowDiscovery
     /**
      * Interface to work with cache operations within transaction.
      */
-    private static interface TestTransaction<K, V> {
+    private interface TestTransaction<K, V> {
         /** Possible operations. */
-        static int POSSIBLE_OPERATIONS = 5;
+        int POSSIBLE_OPERATIONS = 5;
 
         /**
          * @param key Key.
@@ -347,7 +344,7 @@ public class ClientSlowDiscoveryTransactionRemapTest 
extends ClientSlowDiscovery
     public IgniteInClosure<TestTransaction<?, ?>> operation;
 
     /** Client disco spi block. */
-    private CountDownLatch clientDiscoSpiBlock;
+    private CountDownLatch cliDiscoSpiUnblockedLatch;
 
     /** Client node to perform operations. */
     private IgniteEx clnt;
@@ -368,34 +365,25 @@ public class ClientSlowDiscoveryTransactionRemapTest 
extends ClientSlowDiscovery
         cleanPersistenceDir();
     }
 
-    /** */
-    @Before
-    public void before() throws Exception {
-        NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new 
NodeJoinInterceptingDiscoverySpi();
-
-        clientDiscoSpiBlock = new CountDownLatch(1);
+    /** {@inheritDoc} */
+    @Override public void beforeTest() throws Exception {
+        cliDiscoSpiUnblockedLatch = new CountDownLatch(1);
 
-        // Delay node join of second client.
-        clientDiscoSpi.interceptor = msg -> {
+        NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new 
NodeJoinInterceptingDiscoverySpi(msg -> {
             if (msg.nodeId().toString().endsWith("2"))
-                U.awaitQuiet(clientDiscoSpiBlock);
-        };
-
-        discoverySpiSupplier = () -> clientDiscoSpi;
+                U.awaitQuiet(cliDiscoSpiUnblockedLatch);
+        });
 
-        clnt = startClientGrid(1);
+        clnt = startClientGrid(getConfiguration(1, clientDiscoSpi));
 
         for (int k = 0; k < 64; k++)
             clnt.cache(CACHE_NAME).put(k, 0);
 
-        discoverySpiSupplier = TcpDiscoverySpi::new;
-
         startClientGrid(2);
     }
 
-    /** */
-    @After
-    public void after() throws Exception {
+    /** {@inheritDoc} */
+    @Override public void afterTest() throws Exception {
         // Stop client nodes.
         stopGrid(1);
         stopGrid(2);
@@ -421,7 +409,7 @@ public class ClientSlowDiscoveryTransactionRemapTest 
extends ClientSlowDiscovery
             // Expected.
         }
         finally {
-            clientDiscoSpiBlock.countDown();
+            cliDiscoSpiUnblockedLatch.countDown();
         }
 
         // After resume second client join, transaction should succesfully 
await new affinity and commit.
@@ -451,7 +439,7 @@ public class ClientSlowDiscoveryTransactionRemapTest 
extends ClientSlowDiscovery
             // Expected.
         }
         finally {
-            clientDiscoSpiBlock.countDown();
+            cliDiscoSpiUnblockedLatch.countDown();
         }
 
         // After resume second client join, transaction should be timed out 
and rolled back.

Reply via email to