This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c41ed2  IGNITE-12328 IgniteException "Failed to resolve nodes 
topology" during cache.removeAll() and constantly changing topology - Fixes 
#7015.
6c41ed2 is described below

commit 6c41ed2ac91070d334a2c4eddf8a4dfebc44037b
Author: Alexey Scherbakov <alexey.scherbak...@gmail.com>
AuthorDate: Tue Oct 29 14:22:00 2019 +0300

    IGNITE-12328 IgniteException "Failed to resolve nodes topology" during 
cache.removeAll() and constantly changing topology - Fixes #7015.
    
    Signed-off-by: Ivan Rakov <ira...@apache.org>
---
 .../processors/cache/GridCacheIdMessage.java       |   2 +-
 .../cache/distributed/dht/GridDhtCacheAdapter.java |  17 +-
 .../dht/GridDhtTransactionalCacheAdapter.java      |   2 +-
 .../distributed/dht/atomic/GridDhtAtomicCache.java |  10 +-
 .../dht/colocated/GridDhtColocatedLockFuture.java  |  39 +-
 .../cache/distributed/near/GridNearLockFuture.java |   4 -
 .../cache/ClientSlowDiscoveryAbstractTest.java     | 121 ++++++
 ... => ClientSlowDiscoveryTopologyChangeTest.java} | 105 +----
 .../ClientSlowDiscoveryTransactionRemapTest.java   | 473 +++++++++++++++++++++
 .../TxCrossCacheMapOnInvalidTopologyTest.java      | 133 ++++--
 .../TxPartitionCounterStateConsistencyTest.java    |  24 +-
 .../testsuites/IgniteCacheMvccTestSuite5.java      |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite5.java   |   6 +-
 13 files changed, 758 insertions(+), 180 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
index e094439..84d890c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
@@ -112,6 +112,6 @@ public abstract class GridCacheIdMessage extends 
GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridCacheIdMessage.class, this);
+        return S.toString(GridCacheIdMessage.class, this, "super", 
super.toString());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 31adc06..da03d00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1274,30 +1274,29 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     }
 
     /**
-     * @param expVer Expected topology version.
+     * @param mapVer Mapped topology version.
      * @param curVer Current topology version.
      * @return {@code True} if cache affinity changed and operation should be 
remapped.
      */
-    protected final boolean needRemap(AffinityTopologyVersion expVer, 
AffinityTopologyVersion curVer,
-        Collection<KeyCacheObject> keys) {
-        if (curVer.equals(expVer))
+    protected final boolean needRemap(AffinityTopologyVersion mapVer, 
AffinityTopologyVersion curVer) {
+        if (curVer.equals(mapVer))
             return false;
 
-        AffinityTopologyVersion lastAffChangedTopVer = 
ctx.shared().exchange().lastAffinityChangedTopologyVersion(expVer);
+        AffinityTopologyVersion lastAffChangedTopVer = 
ctx.shared().exchange().lastAffinityChangedTopologyVersion(mapVer);
 
-        if (curVer.compareTo(lastAffChangedTopVer) >= 0 && 
curVer.compareTo(expVer) <= 0)
+        if (curVer.isBetween(lastAffChangedTopVer, mapVer))
             return false;
 
         // TODO IGNITE-7164 check mvcc crd for mvcc enabled txs.
 
-        Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
+        Collection<ClusterNode> cacheNodes0 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), mapVer);
         Collection<ClusterNode> cacheNodes1 = 
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
 
-        if (!cacheNodes0.equals(cacheNodes1) || 
ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
+        if (!cacheNodes0.equals(cacheNodes1) || 
ctx.affinity().affinityTopologyVersion().before(curVer))
             return true;
 
         try {
-            List<List<ClusterNode>> aff1 = ctx.affinity().assignments(expVer);
+            List<List<ClusterNode>> aff1 = ctx.affinity().assignments(mapVer);
             List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer);
 
             return !aff1.equals(aff2);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 8fdc6fd..c7f30c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1083,7 +1083,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
             }
 
             try {
-                if (top != null && needRemap(req.topologyVersion(), 
top.readyTopologyVersion(), req.keys())) {
+                if (top != null && needRemap(req.topologyVersion(), 
top.readyTopologyVersion())) {
                     if (log.isDebugEnabled()) {
                         log.debug("Client topology version mismatch, need 
remap lock request [" +
                             "reqTopVer=" + req.topologyVersion() +
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4c47c0c..926530d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1836,10 +1836,16 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             // Do not check topology version if topology was 
locked on near node by
                             // external transaction or explicit lock.
                             if (!req.topologyLocked()) {
+                                AffinityTopologyVersion waitVer = 
top.topologyVersionFuture().initialVersion();
+
+                                // No need to remap if next future version is 
compatible.
+                                boolean compatible =
+                                    
waitVer.isBetween(req.lastAffinityChangedTopologyVersion(), 
req.topologyVersion());
+
                                 // Can not wait for topology future since it 
will break
                                 // GridNearAtomicCheckUpdateRequest processing.
-                                remap = !top.topologyVersionFuture().isDone() 
||
-                                    needRemap(req.topologyVersion(), 
top.readyTopologyVersion(), req.keys());
+                                remap = !compatible && 
!top.topologyVersionFuture().isDone() ||
+                                    needRemap(req.topologyVersion(), 
top.readyTopologyVersion());
                             }
 
                             if (!remap) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 6731405..b0a0e6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -178,6 +178,9 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
     /** {@code True} when mappings are ready for processing. */
     private boolean mappingsReady;
 
+    /** */
+    private boolean trackable = true;
+
     /**
      * @param cctx Registry.
      * @param keys Keys to lock.
@@ -262,12 +265,12 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
 
     /** {@inheritDoc} */
     @Override public boolean trackable() {
-        return true;
+        return trackable;
     }
 
     /** {@inheritDoc} */
     @Override public void markNotTrackable() {
-        throw new UnsupportedOperationException();
+        trackable = false;
     }
 
     /**
@@ -772,20 +775,24 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
             topVer = tx.topologyVersionSnapshot();
 
         if (topVer != null) {
-            for (GridDhtTopologyFuture fut : 
cctx.shared().exchange().exchangeFutures()) {
-                if (fut.exchangeDone() && 
fut.topologyVersion().equals(topVer)) {
-                    Throwable err = null;
+            AffinityTopologyVersion lastChangeVer = 
cctx.shared().exchange().lastAffinityChangedTopologyVersion(topVer);
 
-                    // Before cache validation, make sure that this topology 
future is already completed.
-                    try {
-                        fut.get();
-                    }
-                    catch (IgniteCheckedException e) {
-                        err = fut.error();
-                    }
+            IgniteInternalFuture<AffinityTopologyVersion> affFut = 
cctx.shared().exchange().affinityReadyFuture(lastChangeVer);
 
-                    if (err == null)
-                        err = fut.validateCache(cctx, recovery, read, null, 
keys);
+            if (!affFut.isDone()) {
+                try {
+                    affFut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    onDone(err);
+
+                    return;
+                }
+            }
+
+            for (GridDhtTopologyFuture fut : 
cctx.shared().exchange().exchangeFutures()) {
+                if (fut.exchangeDone() && 
fut.topologyVersion().equals(lastChangeVer)) {
+                    Throwable err = fut.validateCache(cctx, recovery, read, 
null, keys);
 
                     if (err != null) {
                         onDone(err);
@@ -1667,10 +1674,6 @@ public final class GridDhtColocatedLockFuture extends 
GridCacheCompoundIdentityF
 
                 if (res.compatibleRemapVersion()) {
                     if (tx != null) {
-                        // Versions are compatible.
-                        cctx.shared().exchange().
-                            
lastAffinityChangedTopologyVersion(res.clientRemapVersion(), 
tx.topologyVersionSnapshot());
-
                         tx.onRemap(res.clientRemapVersion(), false);
 
                         // Use remapped version for all subsequent mappings.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 97535f5..5a4ee46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1647,10 +1647,6 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
 
                 if (res.compatibleRemapVersion()) {
                     if (tx != null) {
-                        // Versions are compatible.
-                        cctx.shared().exchange().
-                            
lastAffinityChangedTopologyVersion(res.clientRemapVersion(), 
tx.topologyVersionSnapshot());
-
                         tx.onRemap(res.clientRemapVersion(), false);
 
                         // Use remapped version for all subsequent mappings.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
new file mode 100644
index 0000000..24555b5
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.function.Supplier;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    protected static final String CACHE_NAME = "cache";
+
+    /** Cache configuration. */
+    private final CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
+        .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+        .setReadFromBackup(false)
+        .setBackups(1)
+        .setAffinity(new RendezvousAffinityFunction(false, 64));
+
+    /** Client mode. */
+    protected boolean clientMode;
+
+    /** Communication SPI supplier. */
+    protected Supplier<CommunicationSpi> communicationSpiSupplier = 
TestRecordingCommunicationSpi::new;
+
+    /** Discovery SPI supplier. */
+    protected Supplier<DiscoverySpi> discoverySpiSupplier = 
TcpDiscoverySpi::new;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+        cfg.setCacheConfiguration(ccfg);
+        cfg.setCommunicationSpi(communicationSpiSupplier.get());
+        cfg.setDiscoverySpi(discoverySpiSupplier.get());
+        cfg.setClientMode(clientMode);
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    static class NodeJoinInterceptingDiscoverySpi extends TcpDiscoverySpi {
+        /** Interceptor. */
+        protected volatile IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage> 
interceptor;
+
+        /** {@inheritDoc} */
+        @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            if (msg instanceof TcpDiscoveryNodeAddFinishedMessage && 
interceptor != null)
+                interceptor.apply((TcpDiscoveryNodeAddFinishedMessage) msg);
+        }
+    }
+
+    /**
+     *
+     */
+    static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi 
{
+        /** Interceptor. */
+        protected volatile IgniteInClosure<DiscoveryCustomMessage> interceptor;
+
+        /** {@inheritDoc} */
+        @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            if (!(msg instanceof TcpDiscoveryCustomEventMessage))
+                return;
+
+            TcpDiscoveryCustomEventMessage cm = 
(TcpDiscoveryCustomEventMessage)msg;
+
+            DiscoveryCustomMessage delegate;
+
+            try {
+                DiscoverySpiCustomMessage custMsg = cm.message(marshaller(),
+                    U.resolveClassLoader(ignite().configuration()));
+
+                assertNotNull(custMsg);
+
+                delegate = ((CustomMessageWrapper)custMsg).delegate();
+            }
+            catch (Throwable throwable) {
+                throw new RuntimeException(throwable);
+            }
+
+            if (interceptor != null)
+                interceptor.apply(delegate);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientDelayedJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
similarity index 56%
rename from 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientDelayedJoinTest.java
rename to 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
index 06aab27..1a13b15 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientDelayedJoinTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
@@ -19,74 +19,25 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
-import java.util.function.Supplier;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.spi.communication.CommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
-import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Tests for delayed client join.
- * When client node joins to cluster it sends SingleMessage to coordinator.
- * During this time topology on server nodes can be changed,
- * because client exchange doesn't require acknowledgement for SingleMessage 
on coordinator.
- * Delay is simulated by blocking sending this SingleMessage and resume 
sending after topology is changed.
+ *
  */
-public class ClientDelayedJoinTest extends GridCommonAbstractTest {
-    /** Cache name. */
-    private static final String CACHE_NAME = "cache";
-
-    /** Cache configuration. */
-    private final CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
-            .setReadFromBackup(false)
-            .setBackups(1)
-            .setAffinity(new RendezvousAffinityFunction(false, 64));
-
-    /** Client mode. */
-    private boolean clientMode;
-
-    /** Communication SPI supplier. */
-    private Supplier<CommunicationSpi> communicationSpiSupplier = 
TestRecordingCommunicationSpi::new;
-
-    /** Discovery SPI supplier. */
-    private Supplier<DiscoverySpi> discoverySpiSupplier = 
CustomMessageInterceptingDiscoverySpi::new;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        cfg.setConsistentId(igniteInstanceName);
-        cfg.setCacheConfiguration(ccfg);
-        cfg.setCommunicationSpi(communicationSpiSupplier.get());
-        cfg.setDiscoverySpi(discoverySpiSupplier.get());
-        cfg.setClientMode(clientMode);
-
-        return cfg;
-    }
-
+public class ClientSlowDiscoveryTopologyChangeTest extends 
ClientSlowDiscoveryAbstractTest {
     /**
      *
      */
@@ -108,7 +59,12 @@ public class ClientDelayedJoinTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     *
+     * Test check that client join works well if cache configured on it 
stopped on server nodes
+     * but discovery event about cache stop is not delivered to client node 
immediately.
+     * When client node joins to cluster it sends SingleMessage to coordinator.
+     * During this time topology on server nodes can be changed,
+     * because client exchange doesn't require acknowledgement for 
SingleMessage on coordinator.
+     * Delay is simulated by blocking sending this SingleMessage and resume 
sending after topology is changed.
      */
     @Test
     public void testClientJoinAndCacheStop() throws Exception {
@@ -131,9 +87,9 @@ public class ClientDelayedJoinTest extends 
GridCommonAbstractTest {
             GridDhtPartitionsSingleMessage singleMsg = 
(GridDhtPartitionsSingleMessage) msg;
 
             return Optional.ofNullable(singleMsg.exchangeId())
-                    .map(GridDhtPartitionExchangeId::topologyVersion)
-                    .filter(topVer -> topVer.equals(new 
AffinityTopologyVersion(4, 0)))
-                    .isPresent();
+                .map(GridDhtPartitionExchangeId::topologyVersion)
+                .filter(topVer -> topVer.equals(new AffinityTopologyVersion(4, 
0)))
+                .isPresent();
         });
 
         communicationSpiSupplier = () -> clientCommSpi;
@@ -150,7 +106,7 @@ public class ClientDelayedJoinTest extends 
GridCommonAbstractTest {
             DynamicCacheChangeBatch cacheChangeBatch = 
(DynamicCacheChangeBatch) msg;
 
             boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
-                    .anyMatch(req -> req.stop() && 
req.cacheName().equals(CACHE_NAME));
+                .anyMatch(req -> req.stop() && 
req.cacheName().equals(CACHE_NAME));
 
             if (hasCacheStopReq)
                 U.awaitQuiet(clientDiscoSpiBlock);
@@ -185,7 +141,7 @@ public class ClientDelayedJoinTest extends 
GridCommonAbstractTest {
         }
         catch (Exception e) {
             assertTrue("Got unexpected exception during cache get " + e,
-                    X.hasCause(e, CacheStoppedException.class));
+                X.hasCause(e, CacheStoppedException.class));
         }
         finally {
             // Resume processing cache destroy on client node.
@@ -195,7 +151,7 @@ public class ClientDelayedJoinTest extends 
GridCommonAbstractTest {
         // Wait till cache destroyed on client node.
         GridTestUtils.waitForCondition(() -> {
             AffinityTopologyVersion topVer = 
client.context().cache().context().exchange().lastFinishedFuture()
-                    .topologyVersion();
+                .topologyVersion();
 
             // Cache destroy version.
             return topVer.equals(new AffinityTopologyVersion(4, 1));
@@ -203,37 +159,4 @@ public class ClientDelayedJoinTest extends 
GridCommonAbstractTest {
 
         Assert.assertNull("Cache should be destroyed on client node", 
client.cache(CACHE_NAME));
     }
-
-    /**
-     *
-     */
-    static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi 
{
-        /** Interceptor. */
-        private volatile IgniteInClosure<DiscoveryCustomMessage> interceptor;
-
-        /** {@inheritDoc} */
-        @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
-            if (!(msg instanceof TcpDiscoveryCustomEventMessage))
-                return;
-
-            TcpDiscoveryCustomEventMessage cm = 
(TcpDiscoveryCustomEventMessage)msg;
-
-            DiscoveryCustomMessage delegate;
-
-            try {
-                DiscoverySpiCustomMessage custMsg = cm.message(marshaller(),
-                        U.resolveClassLoader(ignite().configuration()));
-
-                assertNotNull(custMsg);
-
-                delegate = ((CustomMessageWrapper)custMsg).delegate();
-            }
-            catch (Throwable throwable) {
-                throw new RuntimeException(throwable);
-            }
-
-            if (interceptor != null)
-                interceptor.apply(delegate);
-        }
-    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
new file mode 100644
index 0000000..51a72bc
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Maps;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.internal.util.collections.Sets;
+
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Tests for client nodes with slow discovery.
+ */
+@RunWith(Parameterized.class)
+public class ClientSlowDiscoveryTransactionRemapTest extends 
ClientSlowDiscoveryAbstractTest {
+    /** */
+    @Parameterized.Parameters(name = "isolation = {0}, concurrency = {1}, 
operation = {2}")
+    public static List<Object[]> parameters() {
+        ArrayList<Object[]> params = new ArrayList<>();
+
+        List<IgniteInClosure<TestTransaction<Integer, Integer>>> operations = 
new ArrayList<>();
+
+        operations.add(new NamedClosure<>(putRemoveSameKey, 
"putRemoveSameKey"));
+        operations.add(new NamedClosure<>(putRemoveDifferentKey, 
"putRemoveDifferentKey"));
+        operations.add(new NamedClosure<>(getPutSameKey, "getPutSameKey"));
+        operations.add(new NamedClosure<>(getPutDifferentKey, 
"getPutDifferentKey"));
+        operations.add(new NamedClosure<>(putAllRemoveAllSameKeys, 
"putAllRemoveAllSameKeys"));
+        operations.add(new NamedClosure<>(putAllRemoveAllDifferentKeys, 
"putAllRemoveAllDifferentKeys"));
+        operations.add(new NamedClosure<>(randomOperation, "random"));
+
+        for (TransactionConcurrency concurrency : 
TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : 
TransactionIsolation.values()) {
+                if (!shouldBeTested(concurrency, isolation))
+                    continue;
+
+                for (IgniteInClosure<TestTransaction<Integer, Integer>> 
operation : operations)
+                    params.add(new Object[] {concurrency, isolation, 
operation});
+            }
+        }
+
+        return params;
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @return {@code True} if pair concurrency - isolation should be tested.
+     */
+    private static boolean shouldBeTested(TransactionConcurrency concurrency, 
TransactionIsolation isolation) {
+        if (concurrency == PESSIMISTIC)
+            return isolation == REPEATABLE_READ || isolation == READ_COMMITTED;
+        return concurrency == OPTIMISTIC && isolation == SERIALIZABLE;
+    }
+
+    /** Keys set. */
+    private static final int KEYS_SET = 64;
+
+    /** Put remove same key. */
+    private static IgniteInClosure<TestTransaction<Integer, Integer>> 
putRemoveSameKey = tx -> {
+        tx.put(1, 1);
+        tx.remove(1);
+        tx.put(1, 100);
+    };
+
+    /** Put remove different key. */
+    private static IgniteInClosure<TestTransaction<Integer, Integer>> 
putRemoveDifferentKey = tx -> {
+        tx.put(1, 1);
+        tx.remove(2);
+    };
+
+    /** Get put same key. */
+    private static IgniteInClosure<TestTransaction<Integer, Integer>> 
getPutSameKey = tx -> {
+        int val = tx.get(1);
+        tx.put(1, val + 1);
+    };
+
+    /** Get put different key. */
+    private static IgniteInClosure<TestTransaction<Integer, Integer>> 
getPutDifferentKey = tx -> {
+        int val = tx.get(1);
+        tx.put(2, val + 1);
+    };
+
+    /** Put all remove all same keys. */
+    private static IgniteInClosure<TestTransaction<Integer, Integer>> 
putAllRemoveAllSameKeys = tx -> {
+        tx.putAll(Maps.asMap(Sets.newSet(1, 2, 3, 4, 5), k -> k));
+        tx.removeAll(Sets.newSet(1, 2, 3, 4, 5));
+    };
+
+    /** Put all remove all different keys. */
+    private static IgniteInClosure<TestTransaction<Integer, Integer>> 
putAllRemoveAllDifferentKeys = tx -> {
+        tx.putAll(Maps.asMap(Sets.newSet(1, 2, 3, 4, 5), k -> k));
+        tx.removeAll(Sets.newSet(6, 7, 8, 9, 10));
+    };
+
+    /** Random operation. */
+    private static IgniteInClosure<TestTransaction<Integer, Integer>> 
randomOperation = tx -> {
+        long seed = ThreadLocalRandom.current().nextLong();
+        log.info("Seed: " + seed);
+        Random random = new Random(seed);
+
+        for (int it = 0; it < 10; it++) {
+            int operation = 
random.nextInt(TestTransaction.POSSIBLE_OPERATIONS);
+
+            switch (operation) {
+                // Get:
+                case 0: {
+                    int key = random.nextInt(KEYS_SET);
+
+                    tx.get(key);
+
+                    break;
+                }
+                // Put:
+                case 1: {
+                    int key = random.nextInt(KEYS_SET);
+                    int val = random.nextInt(KEYS_SET);
+
+                    tx.put(key, val);
+
+                    break;
+                }
+                // Remove:
+                case 2: {
+                    int key = random.nextInt(KEYS_SET);
+
+                    tx.remove(key);
+
+                    break;
+                }
+                // Put All:
+                case 3: {
+                    tx.putAll(
+                        random.ints(5, 0, KEYS_SET)
+                            .boxed()
+                            .distinct()
+                            .collect(toMap(
+                                    k -> k, k -> k)
+                            )
+                    );
+
+                    break;
+                }
+                // Remove All:
+                case 4: {
+                    tx.removeAll(
+                        random.ints(5, 0, KEYS_SET).boxed().collect(toSet())
+                    );
+
+                    break;
+                }
+            }
+        }
+    };
+
+    /**
+     * Interface to work with cache operations within transaction.
+     */
+    private static interface TestTransaction<K, V> {
+        /** Possible operations. */
+        static int POSSIBLE_OPERATIONS = 5;
+
+        /**
+         * @param key Key.
+         * @return Value.
+         */
+        V get(K key);
+
+        /**
+         * @param key Key.
+         * @param val Value.
+         */
+        void put(K key, V val);
+
+        /**
+         * @param key Key.
+         */
+        void remove(K key);
+
+        /**
+         * @param map Map.
+         */
+        void putAll(Map<K, V> map);
+
+        /**
+         * @param keys Keys.
+         */
+        void removeAll(Set<K> keys);
+    }
+
+    /**
+     * Closure with possibility to set name to have proper print in test 
parameters.
+     */
+    private static class NamedClosure<K, V> implements 
IgniteInClosure<TestTransaction<K, V>> {
+        /** Closure. */
+        private final IgniteInClosure<TestTransaction<K, V>> c;
+
+        /** Name. */
+        private final String name;
+
+        /**
+         * @param c Closure.
+         * @param name Name.
+         */
+        public NamedClosure(IgniteInClosure<TestTransaction<K, V>> c, String 
name) {
+            this.c = c;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(TestTransaction<K, V> kvTestTransaction) {
+            c.apply(kvTestTransaction);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return name;
+        }
+    }
+
+    /**
+     * Implementation for transaction operations backed by Ignite cache.
+     */
+    private static class TestTransactionEngine<K, V> implements 
TestTransaction<K, V> {
+        /** Removed. */
+        private final Object RMV = new Object();
+
+        /** Cache. */
+        private final IgniteCache<K, V> cache;
+
+        /** Map to consistency check. */
+        private final Map<K, Object> map;
+
+        /**
+         * @param cache Cache.
+         */
+        TestTransactionEngine(IgniteCache<K, V> cache) {
+            this.cache = cache;
+            this.map = new HashMap<>();
+        }
+
+        /** {@inheritDoc} */
+        @Override public V get(K key) {
+            return cache.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void put(K key, V val) {
+            map.put(key, val);
+            cache.put(key, val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove(K key) {
+            map.put(key, RMV);
+            cache.remove(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void putAll(Map<K, V> map) {
+            for (Map.Entry<K, V> entry : map.entrySet())
+                this.map.put(entry.getKey(), entry.getValue());
+            cache.putAll(map);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeAll(Set<K> keys) {
+            for (K key : keys)
+                map.put(key, RMV);
+            cache.removeAll(keys);
+        }
+
+        /**
+         * Consistency check for transaction operations.
+         */
+        public void consistencyCheck() {
+            for (Map.Entry<K, Object> entry : map.entrySet()) {
+                if (entry.getValue() == RMV)
+                    Assert.assertNull("Value is not null for key: " + 
entry.getKey(), cache.get(entry.getKey()));
+                else
+                    Assert.assertEquals("Values are different for key: " + 
entry.getKey(),
+                        entry.getValue(),
+                        cache.get(entry.getKey())
+                    );
+            }
+        }
+    }
+
+    /** Concurrency. */
+    @Parameterized.Parameter(0)
+    public TransactionConcurrency concurrency;
+
+    /** Isolation. */
+    @Parameterized.Parameter(1)
+    public TransactionIsolation isolation;
+
+    /** Operation. */
+    @Parameterized.Parameter(2)
+    public IgniteInClosure<TestTransaction<?, ?>> operation;
+
+    /** Client disco spi block. */
+    private CountDownLatch clientDiscoSpiBlock;
+
+    /** Client node to perform operations. */
+    private IgniteEx clnt;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Before
+    public void before() throws Exception {
+        clientMode = true;
+
+        NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new 
NodeJoinInterceptingDiscoverySpi();
+
+        clientDiscoSpiBlock = new CountDownLatch(1);
+
+        // Delay node join of second client.
+        clientDiscoSpi.interceptor = msg -> {
+            if (msg.nodeId().toString().endsWith("2"))
+                U.awaitQuiet(clientDiscoSpiBlock);
+        };
+
+        discoverySpiSupplier = () -> clientDiscoSpi;
+
+        clnt = startGrid(1);
+
+        for (int k = 0; k < 64; k++)
+            clnt.cache(CACHE_NAME).put(k, 0);
+
+        discoverySpiSupplier = TcpDiscoverySpi::new;
+
+        startGrid(2);
+    }
+
+    /** */
+    @After
+    public void after() throws Exception {
+        // Stop client nodes.
+        stopGrid(1);
+        stopGrid(2);
+    }
+
+    /** */
+    @Test
+    public void testTransactionRemap() throws Exception {
+        TestTransactionEngine engine = new 
TestTransactionEngine<>(clnt.cache(CACHE_NAME));
+
+        IgniteInternalFuture<?> txFut = GridTestUtils.runAsync(() -> {
+            try (Transaction tx = clnt.transactions().txStart(concurrency, 
isolation)) {
+                operation.apply(engine);
+
+                tx.commit();
+            }
+        });
+
+        try {
+            txFut.get(1, TimeUnit.SECONDS);
+        }
+        catch (IgniteFutureTimeoutCheckedException te) {
+            // Expected.
+        }
+        finally {
+            clientDiscoSpiBlock.countDown();
+        }
+
+        // After resume second client join, transaction should succesfully 
await new affinity and commit.
+        txFut.get();
+
+        // Check consistency after transaction commit.
+        engine.consistencyCheck();
+    }
+
+    /** */
+    @Test
+    public void testTransactionRemapWithTimeout() throws Exception {
+        TestTransactionEngine engine = new 
TestTransactionEngine<>(clnt.cache(CACHE_NAME));
+
+        IgniteInternalFuture<?> txFut = GridTestUtils.runAsync(() -> {
+            try (Transaction tx = clnt.transactions().txStart(concurrency, 
isolation, 1_000, 1_000_000)) {
+                operation.apply(engine);
+
+                tx.commit();
+            }
+        });
+
+        try {
+            txFut.get(2, TimeUnit.SECONDS);
+        }
+        catch (IgniteFutureTimeoutCheckedException te) {
+            // Expected.
+        }
+        finally {
+            clientDiscoSpiBlock.countDown();
+        }
+
+        // After resume second client join, transaction should be timed out 
and rolled back.
+        if (concurrency == PESSIMISTIC) {
+            assertThrowsWithCause((Callable<Object>) txFut::get, 
TransactionTimeoutException.class);
+
+            // Check that initial data is not changed by rollbacked 
transaction.
+            for (int k = 0; k < KEYS_SET; k++)
+                Assert.assertEquals("Cache consistency is broken for key: " + 
k, 0, clnt.cache(CACHE_NAME).get(k));
+        }
+        else {
+            txFut.get();
+
+            engine.consistencyCheck();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
index 73024f6..a22e341 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
@@ -31,16 +29,19 @@ import 
org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
@@ -130,7 +131,19 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends 
GridCommonAbstractTest
     }
 
     /**
+     * Test scenario: cross-cache tx is started when node is left in the 
middle of rebalance, first cache is rebalanced
+     * and second is partially rebalanced.
      *
+     * First cache map request will trigger client compatible remap for 
pessimistic txs,
+     * second cache map request should use new topology version.
+     *
+     * For optimistic tx remap is enforced if more than one mapping in 
transaction or all enlisted caches have compatible
+     * assignments.
+     *
+     * Success: tx is finished on ideal topology version over all mapped nodes.
+     *
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
      */
     private void doTestCrossCacheTxMapOnInvalidTopology(TransactionConcurrency 
concurrency, TransactionIsolation isolation) throws Exception {
         try {
@@ -139,7 +152,7 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends 
GridCommonAbstractTest
 
             awaitPartitionMapExchange();
 
-            Ignite client = startGrid("client");
+            IgniteEx client = startGrid("client");
             assertNotNull(client.cache(CACHE1));
             assertNotNull(client.cache(CACHE2));
 
@@ -159,9 +172,12 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends 
GridCommonAbstractTest
 
             final AffinityTopologyVersion joinVer = new 
AffinityTopologyVersion(4, 0);
             AffinityTopologyVersion leftVer = new AffinityTopologyVersion(5, 
0);
+            AffinityTopologyVersion idealVer = new AffinityTopologyVersion(5, 
1);
 
             AtomicReference<Set<Integer>> full = new AtomicReference<>();
 
+            GridConcurrentSkipListSet<Integer> leftVerParts = new 
GridConcurrentSkipListSet<>();
+
             crdSpi.blockMessages((node, m) -> {
                 if (m instanceof GridDhtPartitionSupplyMessage) {
                     GridDhtPartitionSupplyMessage msg = 
(GridDhtPartitionSupplyMessage)m;
@@ -183,7 +199,20 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends 
GridCommonAbstractTest
                         return true;
                     }
 
-                    if (msg.topologyVersion().equals(leftVer))
+                    if (msg.topologyVersion().equals(leftVer)) {
+                        Map<Integer, Long> last = U.field(msg, "last");
+
+                        leftVerParts.addAll(last.keySet());
+
+                        return true;
+                    }
+                } else if (m instanceof GridDhtPartitionsFullMessage) {
+                    GridDhtPartitionsFullMessage msg = 
(GridDhtPartitionsFullMessage)m;
+
+                    // Delay full message for ideal topology switch.
+                    GridDhtPartitionExchangeId exchId = msg.exchangeId();
+
+                    if (exchId != null && 
exchId.topologyVersion().equals(idealVer))
                         return true;
                 }
 
@@ -237,76 +266,104 @@ public class TxCrossCacheMapOnInvalidTopologyTest 
extends GridCommonAbstractTest
             
grid(0).cachex(CACHE2).context().affinity().affinityReadyFuture(leftVer).get();
             
grid(2).cachex(CACHE2).context().affinity().affinityReadyFuture(leftVer).get();
 
+            AffinityAssignment assignment0 = 
grid(0).cachex(CACHE1).context().affinity().assignment(leftVer);
             AffinityAssignment assignment = 
grid(0).cachex(CACHE2).context().affinity().assignment(leftVer);
 
             // Search for a partition with incompatible assignment.
-            int movingPart = -1;
+            int stablePart = -1; // Partition for cache1 which is mapped for 
both late and ideal topologies to the same primary.
+            int movingPart = -1; // Partition for cache2 which is mapped for 
both late and ideal topologies on different primaries.
 
-            for (int p = 0; p < assignment.assignment().size(); p++) {
-                List<ClusterNode> nodes = assignment.assignment().get(p);
-                List<ClusterNode> nodes2 = assignment.idealAssignment().get(p);
+            for (int p = 0; p < assignment0.assignment().size(); p++) {
+                List<ClusterNode> curr = assignment.assignment().get(p);
+                List<ClusterNode> ideal = assignment.idealAssignment().get(p);
 
-                if (!nodes.equals(nodes2) && nodes.get(0).order() == 1)
-                    movingPart = p;
+                if (curr.equals(ideal) && curr.get(0).order() == 1) {
+                    stablePart = p;
+
+                    break;
+                }
             }
 
-            assertFalse(movingPart == -1);
+            assertFalse(stablePart == -1);
 
-            // Delay rebalance for next top ver.
-            TestRecordingCommunicationSpi.spi(grid(2)).blockMessages((node, 
message) -> {
-                if (message instanceof GridDhtPartitionsSingleMessage) {
-                    GridDhtPartitionsSingleMessage sm = 
(GridDhtPartitionsSingleMessage)message;
+            for (int p = 0; p < assignment.assignment().size(); p++) {
+                List<ClusterNode> curr = assignment.assignment().get(p);
+                List<ClusterNode> ideal = assignment.idealAssignment().get(p);
 
-                    return sm.exchangeId() != null;
+                if (!curr.equals(ideal) && curr.get(0).order() == 1) {
+                    movingPart = p;
+
+                    break;
                 }
+            }
 
-                return false;
-            });
+            assertFalse(movingPart == -1);
 
             TestRecordingCommunicationSpi.spi(client).blockMessages(new 
IgniteBiPredicate<ClusterNode, Message>() {
-                @Override public boolean apply(ClusterNode node, Message 
message) {
+                @Override public boolean apply(ClusterNode node, Message msg) {
                     if (concurrency == PESSIMISTIC)
-                        return message instanceof GridNearLockRequest;
+                        return msg instanceof GridNearLockRequest;
                     else
-                        return message instanceof GridNearTxPrepareRequest;
+                        return msg instanceof GridNearTxPrepareRequest;
                 }
             });
 
+            final int finalStablePart = stablePart;
             final int finalMovingPart = movingPart;
-            IgniteInternalFuture<?> txFut = multithreadedAsync(new Runnable() {
-                @Override public void run() {
-                    try (Transaction tx = 
client.transactions().txStart(concurrency, isolation)) {
-                        Map<Integer, Integer> map = new LinkedHashMap<>();
 
-                        for (int p = 0; p < PARTS_CNT; p++)
-                            map.put(p, p);
+            IgniteInternalFuture<?> txFut = multithreadedAsync(() -> {
+                try (Transaction tx = 
client.transactions().txStart(concurrency, isolation)) {
+                    client.cache(CACHE1).put(finalStablePart, 0); // Will map 
on crd(order=1).
 
-                        client.cache(CACHE1).putAll(map); // Will successfully 
lock topology.
-                        client.cache(CACHE2).put(finalMovingPart, 0); // 
Should remap but will go through without fix.
+                    // Next request will remap to ideal topology, but it's not 
ready on other node except crd.
+                    client.cache(CACHE2).put(finalMovingPart, 0);
 
-                        tx.commit();
-                    }
+                    tx.commit();
                 }
             }, 1, "tx-thread");
 
+            // Wait until all missing supply messages are blocked.
+            assertTrue(GridTestUtils.waitForCondition(() -> 
leftVerParts.size() == PARTS_CNT - full.get().size(), 5_000));
+
+            // Delay first lock request on late topology.
             TestRecordingCommunicationSpi.spi(client).waitForBlocked();
 
-            crdSpi.stopBlock(); // Continue rebalance and trigger next 
topology switch.
+            // At this point only supply messages should be blocked.
+            // Unblock to continue rebalance and trigger ideal topology switch.
+            crdSpi.stopBlock(true, null, false, true);
+
+            // Wait until ideal topology is ready on crd.
+            
crd.context().cache().context().exchange().affinityReadyFuture(idealVer).get(10_000);
 
-            TestRecordingCommunicationSpi.spi(grid(2)).waitForBlocked();
+            // Other node must wait for full message.
+            assertFalse(GridTestUtils.waitForCondition(() ->
+                
grid(2).context().cache().context().exchange().affinityReadyFuture(idealVer).isDone(),
 1_000));
 
+            // Map on unstable topology (PME is in progress on other node).
             TestRecordingCommunicationSpi.spi(client).stopBlock();
 
-            doSleep(5000); // Make sure request will listen for current 
topology future completion.
+            // Capture local transaction.
+            IgniteInternalTx tx0 = 
client.context().cache().context().tm().activeTransactions().iterator().next();
 
-            TestRecordingCommunicationSpi.spi(grid(2)).stopBlock();
+            // Expected behavior: tx must hang (both pessimistic and 
optimistic) because topology is not ready.
+            try {
+                txFut.get(3_000);
 
-            crdSpi.stopBlock();
+                fail("TX must not complete");
+            }
+            catch (IgniteFutureTimeoutCheckedException e) {
+                // Expected.
+            }
 
-            awaitPartitionMapExchange();
+            crdSpi.stopBlock();
 
             txFut.get();
 
+            // Check transaction map version. Should be mapped on ideal 
topology.
+            assertEquals(tx0.topologyVersionSnapshot(), idealVer);
+
+            awaitPartitionMapExchange();
+
             checkFutures();
         }
         finally {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 183f90a..5477a36 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -469,9 +469,7 @@ public class TxPartitionCounterStateConsistencyTest extends 
TxPartitionCounterSt
      */
     @Test
     public void 
testPartitionConsistencyDuringRebalanceAndConcurrentUpdates_NoOp() throws 
Exception {
-        
testPartitionConsistencyDuringRebalanceConcurrentlyWithTopologyChange(s -> {
-        }, s -> {
-        });
+        
testPartitionConsistencyDuringRebalanceConcurrentlyWithTopologyChange(s -> {}, 
s -> {});
     }
 
     /**
@@ -629,9 +627,9 @@ public class TxPartitionCounterStateConsistencyTest extends 
TxPartitionCounterSt
         Integer key0 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
         Integer key = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME));
 
-        TestRecordingCommunicationSpi spi = 
TestRecordingCommunicationSpi.spi(crd);
+        TestRecordingCommunicationSpi crdSpi = 
TestRecordingCommunicationSpi.spi(crd);
 
-        spi.blockMessages((node, message) -> {
+        crdSpi.blockMessages((node, message) -> {
             if (message instanceof GridDhtPartitionsFullMessage) {
                 GridDhtPartitionsFullMessage tmp = 
(GridDhtPartitionsFullMessage)message;
 
@@ -644,11 +642,11 @@ public class TxPartitionCounterStateConsistencyTest 
extends TxPartitionCounterSt
         // Locks mapped wait.
         CountDownLatch l = new CountDownLatch(1);
 
-        IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+        IgniteInternalFuture startNodeFut = GridTestUtils.runAsync(() -> {
             U.awaitQuiet(l);
 
             try {
-                startGrid(SERVER_NODES);
+                startGrid(SERVER_NODES); // Start node out of BLT.
             }
             catch (Exception e) {
                 fail(X.getFullStackTrace(e));
@@ -662,7 +660,7 @@ public class TxPartitionCounterStateConsistencyTest extends 
TxPartitionCounterSt
         });
 
         IgniteInternalFuture txFut = GridTestUtils.runAsync(() -> {
-            try(Transaction tx = client.transactions().txStart()) {
+            try (Transaction tx = client.transactions().txStart()) {
                 Map<Integer, Integer> map = new LinkedHashMap<>();
 
                 map.put(key, key); // clientFirst=true in lockAll.
@@ -680,7 +678,7 @@ public class TxPartitionCounterStateConsistencyTest extends 
TxPartitionCounterSt
 
                 l.countDown();
 
-                spi.waitForBlocked(); // Block PME after finish on crd and 
wait on others.
+                crdSpi.waitForBlocked(); // Block PME after finish on crd and 
wait on others.
 
                 cliSpi.stopBlock(); // Start remote lock mapping.
             }
@@ -689,12 +687,10 @@ public class TxPartitionCounterStateConsistencyTest 
extends TxPartitionCounterSt
             }
         });
 
-        txFut.get(); // Transaction should not be blocked by concurrent PME.
         lockFut.get();
-
-        spi.stopBlock();
-
-        fut.get();
+        crdSpi.stopBlock();
+        txFut.get();
+        startNodeFut.get();
 
         awaitPartitionMapExchange();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
index 5a6b244..bb77047 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
@@ -28,6 +28,7 @@ import 
org.apache.ignite.cache.affinity.AffinityHistoryCleanupTest;
 import org.apache.ignite.cache.affinity.local.LocalAffinityFunctionTest;
 import 
org.apache.ignite.internal.GridCachePartitionExchangeManagerHistSizeTest;
 import 
org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest;
+import 
org.apache.ignite.internal.processors.cache.ClientSlowDiscoveryTransactionRemapTest;
 import org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTest;
 import 
org.apache.ignite.internal.processors.cache.ClusterStatePartitionedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.ClusterStateReplicatedSelfTest;
@@ -89,6 +90,7 @@ public class IgniteCacheMvccTestSuite5 {
         ignoredTests.add(GridCachePartitionExchangeManagerHistSizeTest.class);
         ignoredTests.add(ConcurrentCacheStartTest.class);
         ignoredTests.add(IgniteCacheReadThroughEvictionsVariationsSuite.class);
+        ignoredTests.add(ClientSlowDiscoveryTransactionRemapTest.class);
 
         return IgniteCacheTestSuite5.suite(ignoredTests);
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 756a913..c1684b0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -32,7 +32,8 @@ import 
org.apache.ignite.internal.processors.cache.CacheKeepBinaryTransactionTes
 import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest;
 import org.apache.ignite.internal.processors.cache.CacheRebalancingSelfTest;
 import 
org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest;
-import org.apache.ignite.internal.processors.cache.ClientDelayedJoinTest;
+import 
org.apache.ignite.internal.processors.cache.ClientSlowDiscoveryTopologyChangeTest;
+import 
org.apache.ignite.internal.processors.cache.ClientSlowDiscoveryTransactionRemapTest;
 import org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTest;
 import 
org.apache.ignite.internal.processors.cache.ClusterStatePartitionedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.ClusterStateReplicatedSelfTest;
@@ -121,7 +122,8 @@ public class IgniteCacheTestSuite5 {
 
         GridTestUtils.addTestIfNeeded(suite, 
IgniteCacheReadThroughEvictionsVariationsSuite.class, ignoredTests);
 
-        GridTestUtils.addTestIfNeeded(suite, ClientDelayedJoinTest.class, 
ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
ClientSlowDiscoveryTopologyChangeTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
ClientSlowDiscoveryTransactionRemapTest.class, ignoredTests);
 
         //GridTestUtils.addTestIfNeeded(suite, 
GridCacheAtomicPreloadSelfTest.class, ignoredTests);
 

Reply via email to