IGNITE-8897 Node with longer BaselineHistory joining the cluster causes cluster 
stopping. - Fixes #4357.

Signed-off-by: Dmitriy Pavlov <dpav...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7c3566d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7c3566d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7c3566d

Branch: refs/heads/ignite-8783
Commit: e7c3566d11d6276b18a8c54e7b7f9bf3f9b3126a
Parents: 1077090
Author: Sergey Chugunov <sergey.chugu...@gmail.com>
Authored: Mon Jul 16 17:27:49 2018 +0300
Committer: Dmitriy Pavlov <dpav...@apache.org>
Committed: Mon Jul 16 17:27:49 2018 +0300

----------------------------------------------------------------------
 .../cluster/DiscoveryDataClusterState.java      |  11 +-
 .../cluster/GridClusterStateProcessor.java      |  28 +-
 ...eBaselineAffinityTopologyActivationTest.java | 289 ++++++++++++++++---
 3 files changed, 281 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c3566d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
index 8405410..b6029a3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
@@ -32,9 +32,9 @@ import org.jetbrains.annotations.Nullable;
  * baseline topology.
  * <p>
  * This object also captures a transitional cluster state, when one or more 
fields are changing. In this case,
- * a {@code transitionReqId} field is set to a non-null value and {@code 
prevState} captures previous cluster state.
+ * a {@code transitionReqId} field is set to a non-null value and {@code 
previousBaselineTopology} captures previous cluster state.
  * A joining node catching the cluster in an intermediate state will observe 
{@code transitionReqId} field to be
- * non-null, however the {@code prevState} will not be sent to the joining 
node.
+ * non-null, however the {@code previousBaselineTopology} will not be sent to 
the joining node.
  *
  * TODO https://issues.apache.org/jira/browse/IGNITE-7640 This class must be 
immutable, transitionRes must be set by calling finish().
  */
@@ -201,6 +201,13 @@ public class DiscoveryDataClusterState implements 
Serializable {
     }
 
     /**
+     * @return Previous Baseline topology.
+     */
+    @Nullable public BaselineTopology previousBaselineTopology() {
+        return prevState != null ? prevState.baselineTopology() : null;
+    }
+
+    /**
      * @return {@code True} if baseline topology is set in the cluster. {@code 
False} otherwise.
      */
     public boolean hasBaselineTopology() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c3566d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index d4d0eb1..da0bbf6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -947,8 +947,24 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter implements I
             }
         }
 
+        if (globalState.transition() && globalState.previousBaselineTopology() 
== null) {
+            //case when cluster is activating for the first time and other 
node with existing baseline topology
+            //tries to join
+
+            String msg = "Node with set up BaselineTopology is not allowed " +
+                "to join cluster in the process of first activation: " + 
node.consistentId();
+
+            return new IgniteNodeValidationResult(node.id(), msg, msg);
+        }
+
+        BaselineTopology clusterBlt;
+
+        if (globalState.transition())
+            clusterBlt = globalState.previousBaselineTopology();
+        else
+            clusterBlt = globalState.baselineTopology();
+
         BaselineTopology joiningNodeBlt = joiningNodeState.baselineTopology();
-        BaselineTopology clusterBlt = globalState.baselineTopology();
 
         String recommendation = " Consider cleaning persistent storage of the 
node and adding it to the cluster again.";
 
@@ -968,7 +984,7 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter implements I
             if (!clusterBlt.isCompatibleWith(joiningNodeBlt)) {
                 String msg = "BaselineTopology of joining node ("
                     + node.consistentId()
-                    + " ) is not compatible with BaselineTopology in the 
cluster."
+                    + ") is not compatible with BaselineTopology in the 
cluster."
                     + " Branching history of cluster BlT (" + 
clusterBlt.branchingHistory()
                     + ") doesn't contain branching point hash of joining node 
BlT ("
                     + joiningNodeBlt.branchingPointHash()
@@ -1174,9 +1190,13 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter implements I
     }
 
     /** {@inheritDoc} */
-    @Override public void onBaselineTopologyChanged(BaselineTopology blt, 
BaselineTopologyHistoryItem prevBltHistItem) throws IgniteCheckedException {
+    @Override public void onBaselineTopologyChanged
+    (
+        BaselineTopology blt,
+        BaselineTopologyHistoryItem prevBltHistItem
+    ) throws IgniteCheckedException {
         if (compatibilityMode) {
-            if (log.isDebugEnabled())
+            if (log.isInfoEnabled())
                 log.info("BaselineTopology won't be stored as this node is 
running in compatibility mode");
 
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c3566d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java
index ce287d1..f44e792 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteBaselineAffinityTopologyActivationTest.java
@@ -21,13 +21,14 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -36,18 +37,26 @@ import 
org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.processors.cluster.BaselineTopologyHistory;
 import 
org.apache.ignite.internal.processors.cluster.BaselineTopologyHistoryItem;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
 /**
  *
  */
@@ -58,6 +67,9 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
     /** Entries count to add to cache. */
     private static final int ENTRIES_COUNT = 100;
 
+    /** */
+    private static final String CACHE_NAME = "dfltCache";
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -73,6 +85,16 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
             ).setWalMode(WALMode.LOG_ONLY)
         );
 
+        cfg.setCommunicationSpi(new 
SingleMessageInterceptorCommunicationSpi());
+
+        cfg.setCacheConfiguration(new CacheConfiguration<Integer, Integer>()
+            .setName(CACHE_NAME)
+            .setCacheMode(PARTITIONED)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setBackups(1)
+            .setAffinity(new RendezvousAffinityFunction(32, null))
+        );
+
         return cfg;
     }
 
@@ -98,12 +120,12 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
     public void testAutoActivationWithCompatibleOldNode() throws Exception {
         startGridWithConsistentId("A");
         startGridWithConsistentId("B");
-        startGridWithConsistentId("C").active(true);
+        startGridWithConsistentId("C").cluster().active(true);
 
         stopAllGrids(false);
 
         startGridWithConsistentId("A");
-        startGridWithConsistentId("B").active(true);
+        startGridWithConsistentId("B").cluster().active(true);
 
         {
             IgniteEx nodeA = grid("A");
@@ -133,7 +155,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         boolean active = GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    return nodeC.active();
+                    return nodeC.cluster().active();
                 }
             },
             10_000
@@ -149,7 +171,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
     public void testBltChangeTopVerRemoveOnlineNodeFails() throws Exception {
         Ignite ignite = startGridWithConsistentId("A");
 
-        ignite.active(true);
+        ignite.cluster().active(true);
 
         long singleNodeTopVer = ignite.cluster().topologyVersion();
 
@@ -181,7 +203,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         Ignite nodeB = startGridWithConsistentId("B");
         Ignite nodeC = startGridWithConsistentId("OnlineConsID");
 
-        nodeC.active(true);
+        nodeC.cluster().active(true);
 
         boolean expectedExceptionIsThrown = false;
 
@@ -253,16 +275,16 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
     public void testIncompatibleBltNodeIsProhibitedToJoinCluster() throws 
Exception {
         startGridWithConsistentId("A");
         startGridWithConsistentId("B");
-        startGridWithConsistentId("C").active(true);
+        startGridWithConsistentId("C").cluster().active(true);
 
         stopAllGrids(false);
 
         startGridWithConsistentId("A");
-        startGridWithConsistentId("B").active(true);
+        startGridWithConsistentId("B").cluster().active(true);
 
         stopAllGrids(false);
 
-        startGridWithConsistentId("C").active(true);
+        startGridWithConsistentId("C").cluster().active(true);
 
         stopAllGrids(false);
 
@@ -326,7 +348,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         Ignite nodeB = startGridWithConsistentId("B");
         Ignite nodeC = startGridWithConsistentId("C");
 
-        nodeC.active(true);
+        nodeC.cluster().active(true);
         verifyBaselineTopologyOnNodes(verifier1, new Ignite[] {nodeA, nodeB, 
nodeC});
 
         stopAllGrids(false);
@@ -334,7 +356,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         nodeA = startGridWithConsistentId("A");
         nodeB = startGridWithConsistentId("B");
 
-        nodeB.active(true);
+        nodeB.cluster().active(true);
 
         verifyBaselineTopologyOnNodes(verifier2, new Ignite[] {nodeA, nodeB});
 
@@ -348,6 +370,161 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
     }
 
     /**
+     *
+     * Test verifies that restart node from baseline when PME and BLT change 
processes
+     * are taking place in the cluster simultaneously doesn't lead to shut 
down of alive cluster nodes.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNodeJoinsDuringPartitionMapExchange() throws Exception {
+        startGridWithConsistentId("A");
+        startGridWithConsistentId("B");
+        startGridWithConsistentId("C");
+
+        IgniteEx grid = grid("B");
+
+        grid.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = grid.getOrCreateCache(CACHE_NAME);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i * 2);
+
+        awaitPartitionMapExchange();
+
+        final long topVer = grid.cluster().topologyVersion() + 1;
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        SingleMessageInterceptorCommunicationSpi commSpi = 
(SingleMessageInterceptorCommunicationSpi) grid
+            .configuration().getCommunicationSpi();
+
+        commSpi.blockMsgsWithLatch(latch);
+
+        try {
+            GridTestUtils.runAsync(
+                () -> startGridWithConsistentId("D")
+            ).get(20_000);
+        }
+        catch (Exception ignored) {
+            // timeout exception is expected here
+        }
+
+        try {
+            GridTestUtils.runAsync(
+                () -> grid.cluster().setBaselineTopology(topVer)
+            ).get(10_000);
+        }
+        catch (Exception ignored) {
+            // timeout exception is expected here
+        }
+
+        IgniteInternalFuture restartFut = GridTestUtils.runAsync(
+            () -> {
+                try {
+                    stopGrid("C", true);
+                    startGridWithConsistentId("C");
+                }
+                catch (Exception ignored) {
+                    //ignored
+                }
+            }
+        );
+
+        latch.countDown();
+
+        restartFut.get();
+
+        awaitPartitionMapExchange();
+
+        long expActivationHash = (long)"A".hashCode() + "B".hashCode() + 
"C".hashCode();
+
+        checkBaselineTopologyOnNode(grid("A"), 1, 1, 1, expActivationHash);
+        checkBaselineTopologyOnNode(grid("B"), 1, 1, 1, expActivationHash);
+        checkBaselineTopologyOnNode(grid("C"), 1, 1, 1, expActivationHash);
+        checkBaselineTopologyOnNode(grid("D"), 1, 1, 1, expActivationHash);
+    }
+
+    /**
+     * @param ig Ignite.
+     * @param expBltId Expected BaselineTopology ID.
+     * @param expBltHistSize Expected Baseline history size.
+     * @param expBranchingHistSize Expected branching history size.
+     * @param expActivationHash Expected activation hash.
+     */
+    private void checkBaselineTopologyOnNode(
+        Ignite ig,
+        int expBltId,
+        int expBltHistSize,
+        int expBranchingHistSize,
+        long expActivationHash) {
+        BaselineTopology blt = getBaselineTopology(ig);
+        BaselineTopologyHistory bltHist = getBaselineTopologyHistory(ig);
+
+        assertNotNull(bltHist);
+        assertEquals(expBltId, blt.id());
+
+        assertEquals(expBltHistSize, bltHist.history().size());
+        BaselineTopologyHistoryItem histItem = bltHist.history().get(0);
+
+        assertEquals(expBranchingHistSize, histItem.branchingHistory().size());
+        assertEquals(expActivationHash, 
(long)histItem.branchingHistory().get(0));
+    }
+
+    /**
+     * Test verifies that node with set up BaselineTopology is not allowed to 
join the cluster
+     * in the process of on-going first activation.
+     *
+     * @throws Exception If failed.
+     */
+    public void 
testNodeWithBltIsNotAllowedToJoinClusterDuringFirstActivation() throws 
Exception {
+        Ignite nodeC = startGridWithConsistentId("C");
+
+        nodeC.cluster().active(true);
+
+        stopGrid("C", false);
+
+        Ignite nodeA = startGridWithConsistentId("A");
+        Ignite nodeB = startGridWithConsistentId("B");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        SingleMessageInterceptorCommunicationSpi commSpi = 
(SingleMessageInterceptorCommunicationSpi) nodeB
+            .configuration().getCommunicationSpi();
+
+        commSpi.blockMsgsWithLatch(latch);
+
+        GridTestUtils.runAsync(
+            () -> {
+                try {
+                    nodeA.cluster().active(true);
+                }
+                catch (Exception e) {
+                    log.warning("Exception during activation", e);
+                }
+            });
+
+        try {
+            startGridWithConsistentId("C");
+        }
+        catch (Exception e) {
+            Throwable cause = e.getCause();
+
+            while (!(cause instanceof IgniteSpiException))
+                cause = cause.getCause();
+
+            assertNotNull(cause);
+
+            String msg = cause.getMessage();
+            assertNotNull(msg);
+            assertTrue(msg.startsWith("Node with set up BaselineTopology is 
not allowed " +
+                "to join cluster in the process of first activation:"));
+        }
+
+        latch.countDown();
+    }
+
+    /**
      * Verifies that when new node outside of baseline topology joins active 
cluster with BLT already set
      * it receives BLT from the cluster and stores it locally.
      */
@@ -356,7 +533,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         Ignite nodeB = startGridWithConsistentId("B");
         Ignite nodeC = startGridWithConsistentId("C");
 
-        nodeC.active(true);
+        nodeC.cluster().active(true);
 
         BaselineTopologyVerifier verifier1 = new BaselineTopologyVerifier() {
             @Override public void verify(BaselineTopology blt) {
@@ -376,7 +553,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
 
         nodeD = startGridWithConsistentId("D");
 
-        assertFalse(nodeD.active());
+        assertFalse(nodeD.cluster().active());
 
         verifyBaselineTopologyOnNodes(verifier1, new Ignite[] {nodeD});
     }
@@ -403,7 +580,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         startGridWithConsistentId("B");
         Ignite nodeC = startGridWithConsistentId("C");
 
-        nodeC.active(true);
+        nodeC.cluster().active(true);
 
         stopGrid("B", false);
 
@@ -457,7 +634,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         Ignite nodeB = startGridWithConsistentId("B");
         Ignite nodeC = startGridWithConsistentId("C");
 
-        nodeC.active(true);
+        nodeC.cluster().active(true);
 
         IgniteEx nodeD = (IgniteEx) startGridWithConsistentId("D");
 
@@ -480,7 +657,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         Ignite nodeB = startGridWithConsistentId("B");
         Ignite nodeC = startGridWithConsistentId("C");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         nodeA.cluster().setBaselineTopology(null);
 
@@ -541,11 +718,11 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
 
         Ignite nodeA = startGridWithConsistentId("A");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         Ignite nodeB = startGridWithConsistentId("B");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         verifyBaselineTopologyOnNodes(verifier, new Ignite[] {nodeA, nodeB});
     }
@@ -557,7 +734,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
     public void testAutoActivationWithBaselineTopologyPreset() throws 
Exception {
         Ignite ig = startGridWithConsistentId("A");
 
-        ig.active(true);
+        ig.cluster().active(true);
 
         ig.cluster().setBaselineTopology(Arrays.asList(new BaselineNode[] {
             createBaselineNodeWithConsId("A"), 
createBaselineNodeWithConsId("B"), createBaselineNodeWithConsId("C")}));
@@ -573,7 +750,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         boolean activated = GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                @Override public boolean apply() {
-                   return ig1.active();
+                   return ig1.cluster().active();
                }
             },
             10_000
@@ -599,7 +776,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
 
         IgniteEx srv = grid(0);
 
-        srv.active(true);
+        srv.cluster().active(true);
 
         createAndFillCache(srv);
 
@@ -614,7 +791,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         boolean clusterActive = GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    return ig.active();
+                    return ig.cluster().active();
                 }
             },
             10_000);
@@ -632,21 +809,21 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
 
         IgniteEx srv = grid(0);
 
-        srv.active(true);
+        srv.cluster().active(true);
 
         awaitPartitionMapExchange();
 
-        assertTrue(srv.active());
+        assertTrue(srv.cluster().active());
 
-        srv.active(false);
+        srv.cluster().active(false);
 
-        assertFalse(srv.active());
+        assertFalse(srv.cluster().active());
 
         startGrid(2);
 
         Thread.sleep(3_000);
 
-        assertFalse(srv.active());
+        assertFalse(srv.cluster().active());
     }
 
     /**
@@ -657,13 +834,13 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
 
         IgniteEx srv = grid(0);
 
-        srv.active(true);
+        srv.cluster().active(true);
 
         awaitPartitionMapExchange();
 
-        assertTrue(srv.active());
+        assertTrue(srv.cluster().active());
 
-        srv.active(false);
+        srv.cluster().active(false);
 
         BaselineTopology blt = getBaselineTopology(srv);
 
@@ -704,7 +881,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         startGridWithConsistentId("B");
         startGridWithConsistentId("C");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         stopGrid("C", false);
 
@@ -755,7 +932,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         startGridWithConsistentId("B");
         startGridWithConsistentId("C");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         stopGrid("C", false);
 
@@ -783,11 +960,11 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         startGridWithConsistentId("B");
         startGridWithConsistentId("C");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         assertNotNull(nodeA.cluster().currentBaselineTopology());
 
-        nodeA.active(false);
+        nodeA.cluster().active(false);
 
         stopAllGrids();
 
@@ -800,7 +977,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
         boolean clusterActive = GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    return ig.active();
+                    return ig.cluster().active();
                 }
             },
             10_000);
@@ -822,7 +999,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
 
         Ignite nodeC = startGridWithConsistentId("C");
 
-        nodeC.active(true);
+        nodeC.cluster().active(true);
 
         stopGrid("C", false);
 
@@ -869,14 +1046,14 @@ public class 
IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst
         startGridWithConsistentId("B");
         startGridWithConsistentId("C");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         stopAllGrids(false);
 
         nodeA = startGridWithConsistentId("A");
         startGridWithConsistentId("B");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         
nodeA.cluster().setBaselineTopology(baselineNodes(nodeA.cluster().forServers().nodes()));
 
@@ -884,7 +1061,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
 
         nodeA = startGridWithConsistentId("A");
 
-        nodeA.active(true);
+        nodeA.cluster().active(true);
 
         
nodeA.cluster().setBaselineTopology(baselineNodes(nodeA.cluster().forServers().nodes()));
 
@@ -894,7 +1071,7 @@ public class IgniteBaselineAffinityTopologyActivationTest 
extends GridCommonAbst
 
         boolean activated = GridTestUtils.waitForCondition(new 
GridAbsPredicate() {
             @Override public boolean apply() {
-                return node.active();
+                return node.cluster().active();
             }
         }, 10_000);
 
@@ -959,12 +1136,42 @@ public class 
IgniteBaselineAffinityTopologyActivationTest extends GridCommonAbst
     private CacheConfiguration cacheConfiguration() {
         return new CacheConfiguration()
             .setName(DEFAULT_CACHE_NAME)
-            .setCacheMode(CacheMode.PARTITIONED)
+            .setCacheMode(PARTITIONED)
             .setAtomicityMode(CacheAtomicityMode.ATOMIC)
             .setBackups(2)
             
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
     }
 
+    /**
+     * TcpCommunicationSpi aimed to delay {@link 
GridDhtPartitionsSingleMessage} to emulate PME hanging.
+     */
+    private static class SingleMessageInterceptorCommunicationSpi extends 
TcpCommunicationSpi {
+        /** */
+        private volatile CountDownLatch singleMsgSndLatch;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            ClusterNode node,
+            Message msg,
+            IgniteInClosure<IgniteException> ackC
+        ) throws IgniteSpiException {
+            if (((GridIoMessage) msg).message() instanceof 
GridDhtPartitionsSingleMessage) {
+                try {
+                    if (singleMsgSndLatch != null)
+                        singleMsgSndLatch.await();
+                }
+                catch (Exception ignored) { }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+
+        /** */
+        void blockMsgsWithLatch(CountDownLatch latch) {
+            singleMsgSndLatch = latch;
+        }
+    }
+
     /** */
     private static final class TestValue {
         /** */

Reply via email to