IGNITE-8897 Node with longer BaselineHistory joining the cluster causes cluster stopping. - Fixes #4357.
Signed-off-by: Dmitriy Pavlov <dpav...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7c3566d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7c3566d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7c3566d Branch: refs/heads/ignite-8783 Commit: e7c3566d11d6276b18a8c54e7b7f9bf3f9b3126a Parents: 1077090 Author: Sergey Chugunov <sergey.chugu...@gmail.com> Authored: Mon Jul 16 17:27:49 2018 +0300 Committer: Dmitriy Pavlov <dpav...@apache.org> Committed: Mon Jul 16 17:27:49 2018 +0300 ---------------------------------------------------------------------- .../cluster/DiscoveryDataClusterState.java | 11 +- .../cluster/GridClusterStateProcessor.java | 28 +- ...eBaselineAffinityTopologyActivationTest.java | 289 ++++++++++++++++--- 3 files changed, 281 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c3566d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java index 8405410..b6029a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java @@ -32,9 +32,9 @@ import org.jetbrains.annotations.Nullable; * baseline topology. * <p> * This object also captures a transitional cluster state, when one or more fields are changing. In this case, - * a {@code transitionReqId} field is set to a non-null value and {@code prevState} captures previous cluster state. + * a {@code transitionReqId} field is set to a non-null value and {@code previousBaselineTopology} captures previous cluster state. * A joining node catching the cluster in an intermediate state will observe {@code transitionReqId} field to be - * non-null, however the {@code prevState} will not be sent to the joining node. + * non-null, however the {@code previousBaselineTopology} will not be sent to the joining node. * * TODO https://issues.apache.org/jira/browse/IGNITE-7640 This class must be immutable, transitionRes must be set by calling finish(). */ @@ -201,6 +201,13 @@ public class DiscoveryDataClusterState implements Serializable { } /** + * @return Previous Baseline topology. + */ + @Nullable public BaselineTopology previousBaselineTopology() { + return prevState != null ? prevState.baselineTopology() : null; + } + + /** * @return {@code True} if baseline topology is set in the cluster. {@code False} otherwise. */ public boolean hasBaselineTopology() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c3566d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index d4d0eb1..da0bbf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -947,8 +947,24 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I } } + if (globalState.transition() && globalState.previousBaselineTopology() == null) { + //case when cluster is activating for the first time and other node with existing baseline topology + //tries to join + + String msg = "Node with set up BaselineTopology is not allowed " + + "to join cluster in the process of first activation: " + node.consistentId(); + + return new IgniteNodeValidationResult(node.id(), msg, msg); + } + + BaselineTopology clusterBlt; + + if (globalState.transition()) + clusterBlt = globalState.previousBaselineTopology(); + else + clusterBlt = globalState.baselineTopology(); + BaselineTopology joiningNodeBlt = joiningNodeState.baselineTopology(); - BaselineTopology clusterBlt = globalState.baselineTopology(); String recommendation = " Consider cleaning persistent storage of the node and adding it to the cluster again."; @@ -968,7 +984,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I if (!clusterBlt.isCompatibleWith(joiningNodeBlt)) { String msg = "BaselineTopology of joining node (" + node.consistentId() - + " ) is not compatible with BaselineTopology in the cluster." + + ") is not compatible with BaselineTopology in the cluster." + " Branching history of cluster BlT (" + clusterBlt.branchingHistory() + ") doesn't contain branching point hash of joining node BlT (" + joiningNodeBlt.branchingPointHash() @@ -1174,9 +1190,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I } /** {@inheritDoc} */ - @Override public void onBaselineTopologyChanged(BaselineTopology blt, BaselineTopologyHistoryItem prevBltHistItem) throws IgniteCheckedException { + @Override public void onBaselineTopologyChanged + ( + BaselineTopology blt, + BaselineTopologyHistoryItem prevBltHistItem + ) throws IgniteCheckedException { if (compatibilityMode) { - if (log.isDebugEnabled()) + if (log.isInfoEnabled()) log.info("BaselineTopology won't be stored as this node is running in compatibility mode"); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c3566d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java index ce287d1..f44e792 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java @@ -21,13 +21,14 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.BaselineNode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -36,18 +37,26 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cluster.BaselineTopology; import org.apache.ignite.internal.processors.cluster.BaselineTopologyHistory; import org.apache.ignite.internal.processors.cluster.BaselineTopologyHistoryItem; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + /** * */ @@ -58,6 +67,9 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst /** Entries count to add to cache. */ private static final int ENTRIES_COUNT = 100; + /** */ + private static final String CACHE_NAME = "dfltCache"; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -73,6 +85,16 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst ).setWalMode(WALMode.LOG_ONLY) ); + cfg.setCommunicationSpi(new SingleMessageInterceptorCommunicationSpi()); + + cfg.setCacheConfiguration(new CacheConfiguration<Integer, Integer>() + .setName(CACHE_NAME) + .setCacheMode(PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setBackups(1) + .setAffinity(new RendezvousAffinityFunction(32, null)) + ); + return cfg; } @@ -98,12 +120,12 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst public void testAutoActivationWithCompatibleOldNode() throws Exception { startGridWithConsistentId("A"); startGridWithConsistentId("B"); - startGridWithConsistentId("C").active(true); + startGridWithConsistentId("C").cluster().active(true); stopAllGrids(false); startGridWithConsistentId("A"); - startGridWithConsistentId("B").active(true); + startGridWithConsistentId("B").cluster().active(true); { IgniteEx nodeA = grid("A"); @@ -133,7 +155,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst boolean active = GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { - return nodeC.active(); + return nodeC.cluster().active(); } }, 10_000 @@ -149,7 +171,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst public void testBltChangeTopVerRemoveOnlineNodeFails() throws Exception { Ignite ignite = startGridWithConsistentId("A"); - ignite.active(true); + ignite.cluster().active(true); long singleNodeTopVer = ignite.cluster().topologyVersion(); @@ -181,7 +203,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst Ignite nodeB = startGridWithConsistentId("B"); Ignite nodeC = startGridWithConsistentId("OnlineConsID"); - nodeC.active(true); + nodeC.cluster().active(true); boolean expectedExceptionIsThrown = false; @@ -253,16 +275,16 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst public void testIncompatibleBltNodeIsProhibitedToJoinCluster() throws Exception { startGridWithConsistentId("A"); startGridWithConsistentId("B"); - startGridWithConsistentId("C").active(true); + startGridWithConsistentId("C").cluster().active(true); stopAllGrids(false); startGridWithConsistentId("A"); - startGridWithConsistentId("B").active(true); + startGridWithConsistentId("B").cluster().active(true); stopAllGrids(false); - startGridWithConsistentId("C").active(true); + startGridWithConsistentId("C").cluster().active(true); stopAllGrids(false); @@ -326,7 +348,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst Ignite nodeB = startGridWithConsistentId("B"); Ignite nodeC = startGridWithConsistentId("C"); - nodeC.active(true); + nodeC.cluster().active(true); verifyBaselineTopologyOnNodes(verifier1, new Ignite[] {nodeA, nodeB, nodeC}); stopAllGrids(false); @@ -334,7 +356,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst nodeA = startGridWithConsistentId("A"); nodeB = startGridWithConsistentId("B"); - nodeB.active(true); + nodeB.cluster().active(true); verifyBaselineTopologyOnNodes(verifier2, new Ignite[] {nodeA, nodeB}); @@ -348,6 +370,161 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst } /** + * + * Test verifies that restart node from baseline when PME and BLT change processes + * are taking place in the cluster simultaneously doesn't lead to shut down of alive cluster nodes. + * + * @throws Exception If failed. + */ + public void testNodeJoinsDuringPartitionMapExchange() throws Exception { + startGridWithConsistentId("A"); + startGridWithConsistentId("B"); + startGridWithConsistentId("C"); + + IgniteEx grid = grid("B"); + + grid.cluster().active(true); + + IgniteCache<Object, Object> cache = grid.getOrCreateCache(CACHE_NAME); + + for (int i = 0; i < 100; i++) + cache.put(i, i * 2); + + awaitPartitionMapExchange(); + + final long topVer = grid.cluster().topologyVersion() + 1; + + final CountDownLatch latch = new CountDownLatch(1); + + SingleMessageInterceptorCommunicationSpi commSpi = (SingleMessageInterceptorCommunicationSpi) grid + .configuration().getCommunicationSpi(); + + commSpi.blockMsgsWithLatch(latch); + + try { + GridTestUtils.runAsync( + () -> startGridWithConsistentId("D") + ).get(20_000); + } + catch (Exception ignored) { + // timeout exception is expected here + } + + try { + GridTestUtils.runAsync( + () -> grid.cluster().setBaselineTopology(topVer) + ).get(10_000); + } + catch (Exception ignored) { + // timeout exception is expected here + } + + IgniteInternalFuture restartFut = GridTestUtils.runAsync( + () -> { + try { + stopGrid("C", true); + startGridWithConsistentId("C"); + } + catch (Exception ignored) { + //ignored + } + } + ); + + latch.countDown(); + + restartFut.get(); + + awaitPartitionMapExchange(); + + long expActivationHash = (long)"A".hashCode() + "B".hashCode() + "C".hashCode(); + + checkBaselineTopologyOnNode(grid("A"), 1, 1, 1, expActivationHash); + checkBaselineTopologyOnNode(grid("B"), 1, 1, 1, expActivationHash); + checkBaselineTopologyOnNode(grid("C"), 1, 1, 1, expActivationHash); + checkBaselineTopologyOnNode(grid("D"), 1, 1, 1, expActivationHash); + } + + /** + * @param ig Ignite. + * @param expBltId Expected BaselineTopology ID. + * @param expBltHistSize Expected Baseline history size. + * @param expBranchingHistSize Expected branching history size. + * @param expActivationHash Expected activation hash. + */ + private void checkBaselineTopologyOnNode( + Ignite ig, + int expBltId, + int expBltHistSize, + int expBranchingHistSize, + long expActivationHash) { + BaselineTopology blt = getBaselineTopology(ig); + BaselineTopologyHistory bltHist = getBaselineTopologyHistory(ig); + + assertNotNull(bltHist); + assertEquals(expBltId, blt.id()); + + assertEquals(expBltHistSize, bltHist.history().size()); + BaselineTopologyHistoryItem histItem = bltHist.history().get(0); + + assertEquals(expBranchingHistSize, histItem.branchingHistory().size()); + assertEquals(expActivationHash, (long)histItem.branchingHistory().get(0)); + } + + /** + * Test verifies that node with set up BaselineTopology is not allowed to join the cluster + * in the process of on-going first activation. + * + * @throws Exception If failed. + */ + public void testNodeWithBltIsNotAllowedToJoinClusterDuringFirstActivation() throws Exception { + Ignite nodeC = startGridWithConsistentId("C"); + + nodeC.cluster().active(true); + + stopGrid("C", false); + + Ignite nodeA = startGridWithConsistentId("A"); + Ignite nodeB = startGridWithConsistentId("B"); + + final CountDownLatch latch = new CountDownLatch(1); + + SingleMessageInterceptorCommunicationSpi commSpi = (SingleMessageInterceptorCommunicationSpi) nodeB + .configuration().getCommunicationSpi(); + + commSpi.blockMsgsWithLatch(latch); + + GridTestUtils.runAsync( + () -> { + try { + nodeA.cluster().active(true); + } + catch (Exception e) { + log.warning("Exception during activation", e); + } + }); + + try { + startGridWithConsistentId("C"); + } + catch (Exception e) { + Throwable cause = e.getCause(); + + while (!(cause instanceof IgniteSpiException)) + cause = cause.getCause(); + + assertNotNull(cause); + + String msg = cause.getMessage(); + assertNotNull(msg); + assertTrue(msg.startsWith("Node with set up BaselineTopology is not allowed " + + "to join cluster in the process of first activation:")); + } + + latch.countDown(); + } + + /** * Verifies that when new node outside of baseline topology joins active cluster with BLT already set * it receives BLT from the cluster and stores it locally. */ @@ -356,7 +533,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst Ignite nodeB = startGridWithConsistentId("B"); Ignite nodeC = startGridWithConsistentId("C"); - nodeC.active(true); + nodeC.cluster().active(true); BaselineTopologyVerifier verifier1 = new BaselineTopologyVerifier() { @Override public void verify(BaselineTopology blt) { @@ -376,7 +553,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst nodeD = startGridWithConsistentId("D"); - assertFalse(nodeD.active()); + assertFalse(nodeD.cluster().active()); verifyBaselineTopologyOnNodes(verifier1, new Ignite[] {nodeD}); } @@ -403,7 +580,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst startGridWithConsistentId("B"); Ignite nodeC = startGridWithConsistentId("C"); - nodeC.active(true); + nodeC.cluster().active(true); stopGrid("B", false); @@ -457,7 +634,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst Ignite nodeB = startGridWithConsistentId("B"); Ignite nodeC = startGridWithConsistentId("C"); - nodeC.active(true); + nodeC.cluster().active(true); IgniteEx nodeD = (IgniteEx) startGridWithConsistentId("D"); @@ -480,7 +657,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst Ignite nodeB = startGridWithConsistentId("B"); Ignite nodeC = startGridWithConsistentId("C"); - nodeA.active(true); + nodeA.cluster().active(true); nodeA.cluster().setBaselineTopology(null); @@ -541,11 +718,11 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst Ignite nodeA = startGridWithConsistentId("A"); - nodeA.active(true); + nodeA.cluster().active(true); Ignite nodeB = startGridWithConsistentId("B"); - nodeA.active(true); + nodeA.cluster().active(true); verifyBaselineTopologyOnNodes(verifier, new Ignite[] {nodeA, nodeB}); } @@ -557,7 +734,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst public void testAutoActivationWithBaselineTopologyPreset() throws Exception { Ignite ig = startGridWithConsistentId("A"); - ig.active(true); + ig.cluster().active(true); ig.cluster().setBaselineTopology(Arrays.asList(new BaselineNode[] { createBaselineNodeWithConsId("A"), createBaselineNodeWithConsId("B"), createBaselineNodeWithConsId("C")})); @@ -573,7 +750,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst boolean activated = GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { - return ig1.active(); + return ig1.cluster().active(); } }, 10_000 @@ -599,7 +776,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst IgniteEx srv = grid(0); - srv.active(true); + srv.cluster().active(true); createAndFillCache(srv); @@ -614,7 +791,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst boolean clusterActive = GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { - return ig.active(); + return ig.cluster().active(); } }, 10_000); @@ -632,21 +809,21 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst IgniteEx srv = grid(0); - srv.active(true); + srv.cluster().active(true); awaitPartitionMapExchange(); - assertTrue(srv.active()); + assertTrue(srv.cluster().active()); - srv.active(false); + srv.cluster().active(false); - assertFalse(srv.active()); + assertFalse(srv.cluster().active()); startGrid(2); Thread.sleep(3_000); - assertFalse(srv.active()); + assertFalse(srv.cluster().active()); } /** @@ -657,13 +834,13 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst IgniteEx srv = grid(0); - srv.active(true); + srv.cluster().active(true); awaitPartitionMapExchange(); - assertTrue(srv.active()); + assertTrue(srv.cluster().active()); - srv.active(false); + srv.cluster().active(false); BaselineTopology blt = getBaselineTopology(srv); @@ -704,7 +881,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst startGridWithConsistentId("B"); startGridWithConsistentId("C"); - nodeA.active(true); + nodeA.cluster().active(true); stopGrid("C", false); @@ -755,7 +932,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst startGridWithConsistentId("B"); startGridWithConsistentId("C"); - nodeA.active(true); + nodeA.cluster().active(true); stopGrid("C", false); @@ -783,11 +960,11 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst startGridWithConsistentId("B"); startGridWithConsistentId("C"); - nodeA.active(true); + nodeA.cluster().active(true); assertNotNull(nodeA.cluster().currentBaselineTopology()); - nodeA.active(false); + nodeA.cluster().active(false); stopAllGrids(); @@ -800,7 +977,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst boolean clusterActive = GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { - return ig.active(); + return ig.cluster().active(); } }, 10_000); @@ -822,7 +999,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst Ignite nodeC = startGridWithConsistentId("C"); - nodeC.active(true); + nodeC.cluster().active(true); stopGrid("C", false); @@ -869,14 +1046,14 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst startGridWithConsistentId("B"); startGridWithConsistentId("C"); - nodeA.active(true); + nodeA.cluster().active(true); stopAllGrids(false); nodeA = startGridWithConsistentId("A"); startGridWithConsistentId("B"); - nodeA.active(true); + nodeA.cluster().active(true); nodeA.cluster().setBaselineTopology(baselineNodes(nodeA.cluster().forServers().nodes())); @@ -884,7 +1061,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst nodeA = startGridWithConsistentId("A"); - nodeA.active(true); + nodeA.cluster().active(true); nodeA.cluster().setBaselineTopology(baselineNodes(nodeA.cluster().forServers().nodes())); @@ -894,7 +1071,7 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst boolean activated = GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - return node.active(); + return node.cluster().active(); } }, 10_000); @@ -959,12 +1136,42 @@ public class IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst private CacheConfiguration cacheConfiguration() { return new CacheConfiguration() .setName(DEFAULT_CACHE_NAME) - .setCacheMode(CacheMode.PARTITIONED) + .setCacheMode(PARTITIONED) .setAtomicityMode(CacheAtomicityMode.ATOMIC) .setBackups(2) .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); } + /** + * TcpCommunicationSpi aimed to delay {@link GridDhtPartitionsSingleMessage} to emulate PME hanging. + */ + private static class SingleMessageInterceptorCommunicationSpi extends TcpCommunicationSpi { + /** */ + private volatile CountDownLatch singleMsgSndLatch; + + /** {@inheritDoc} */ + @Override public void sendMessage( + ClusterNode node, + Message msg, + IgniteInClosure<IgniteException> ackC + ) throws IgniteSpiException { + if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsSingleMessage) { + try { + if (singleMsgSndLatch != null) + singleMsgSndLatch.await(); + } + catch (Exception ignored) { } + } + + super.sendMessage(node, msg, ackC); + } + + /** */ + void blockMsgsWithLatch(CountDownLatch latch) { + singleMsgSndLatch = latch; + } + } + /** */ private static final class TestValue { /** */