This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6c41ed2 IGNITE-12328 IgniteException "Failed to resolve nodes
topology" during cache.removeAll() and constantly changing topology - Fixes
#7015.
6c41ed2 is described below
commit 6c41ed2ac91070d334a2c4eddf8a4dfebc44037b
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Tue Oct 29 14:22:00 2019 +0300
IGNITE-12328 IgniteException "Failed to resolve nodes topology" during
cache.removeAll() and constantly changing topology - Fixes #7015.
Signed-off-by: Ivan Rakov <[email protected]>
---
.../processors/cache/GridCacheIdMessage.java | 2 +-
.../cache/distributed/dht/GridDhtCacheAdapter.java | 17 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../distributed/dht/atomic/GridDhtAtomicCache.java | 10 +-
.../dht/colocated/GridDhtColocatedLockFuture.java | 39 +-
.../cache/distributed/near/GridNearLockFuture.java | 4 -
.../cache/ClientSlowDiscoveryAbstractTest.java | 121 ++++++
... => ClientSlowDiscoveryTopologyChangeTest.java} | 105 +----
.../ClientSlowDiscoveryTransactionRemapTest.java | 473 +++++++++++++++++++++
.../TxCrossCacheMapOnInvalidTopologyTest.java | 133 ++++--
.../TxPartitionCounterStateConsistencyTest.java | 24 +-
.../testsuites/IgniteCacheMvccTestSuite5.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite5.java | 6 +-
13 files changed, 758 insertions(+), 180 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
index e094439..84d890c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java
@@ -112,6 +112,6 @@ public abstract class GridCacheIdMessage extends
GridCacheMessage {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridCacheIdMessage.class, this);
+ return S.toString(GridCacheIdMessage.class, this, "super",
super.toString());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 31adc06..da03d00 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1274,30 +1274,29 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
}
/**
- * @param expVer Expected topology version.
+ * @param mapVer Mapped topology version.
* @param curVer Current topology version.
* @return {@code True} if cache affinity changed and operation should be
remapped.
*/
- protected final boolean needRemap(AffinityTopologyVersion expVer,
AffinityTopologyVersion curVer,
- Collection<KeyCacheObject> keys) {
- if (curVer.equals(expVer))
+ protected final boolean needRemap(AffinityTopologyVersion mapVer,
AffinityTopologyVersion curVer) {
+ if (curVer.equals(mapVer))
return false;
- AffinityTopologyVersion lastAffChangedTopVer =
ctx.shared().exchange().lastAffinityChangedTopologyVersion(expVer);
+ AffinityTopologyVersion lastAffChangedTopVer =
ctx.shared().exchange().lastAffinityChangedTopologyVersion(mapVer);
- if (curVer.compareTo(lastAffChangedTopVer) >= 0 &&
curVer.compareTo(expVer) <= 0)
+ if (curVer.isBetween(lastAffChangedTopVer, mapVer))
return false;
// TODO IGNITE-7164 check mvcc crd for mvcc enabled txs.
- Collection<ClusterNode> cacheNodes0 =
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
+ Collection<ClusterNode> cacheNodes0 =
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), mapVer);
Collection<ClusterNode> cacheNodes1 =
ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
- if (!cacheNodes0.equals(cacheNodes1) ||
ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
+ if (!cacheNodes0.equals(cacheNodes1) ||
ctx.affinity().affinityTopologyVersion().before(curVer))
return true;
try {
- List<List<ClusterNode>> aff1 = ctx.affinity().assignments(expVer);
+ List<List<ClusterNode>> aff1 = ctx.affinity().assignments(mapVer);
List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer);
return !aff1.equals(aff2);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 8fdc6fd..c7f30c7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1083,7 +1083,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
}
try {
- if (top != null && needRemap(req.topologyVersion(),
top.readyTopologyVersion(), req.keys())) {
+ if (top != null && needRemap(req.topologyVersion(),
top.readyTopologyVersion())) {
if (log.isDebugEnabled()) {
log.debug("Client topology version mismatch, need
remap lock request [" +
"reqTopVer=" + req.topologyVersion() +
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4c47c0c..926530d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1836,10 +1836,16 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
// Do not check topology version if topology was
locked on near node by
// external transaction or explicit lock.
if (!req.topologyLocked()) {
+ AffinityTopologyVersion waitVer =
top.topologyVersionFuture().initialVersion();
+
+ // No need to remap if next future version is
compatible.
+ boolean compatible =
+
waitVer.isBetween(req.lastAffinityChangedTopologyVersion(),
req.topologyVersion());
+
// Can not wait for topology future since it
will break
// GridNearAtomicCheckUpdateRequest processing.
- remap = !top.topologyVersionFuture().isDone()
||
- needRemap(req.topologyVersion(),
top.readyTopologyVersion(), req.keys());
+ remap = !compatible &&
!top.topologyVersionFuture().isDone() ||
+ needRemap(req.topologyVersion(),
top.readyTopologyVersion());
}
if (!remap) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 6731405..b0a0e6d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -178,6 +178,9 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
/** {@code True} when mappings are ready for processing. */
private boolean mappingsReady;
+ /** */
+ private boolean trackable = true;
+
/**
* @param cctx Registry.
* @param keys Keys to lock.
@@ -262,12 +265,12 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
/** {@inheritDoc} */
@Override public boolean trackable() {
- return true;
+ return trackable;
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
- throw new UnsupportedOperationException();
+ trackable = false;
}
/**
@@ -772,20 +775,24 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
topVer = tx.topologyVersionSnapshot();
if (topVer != null) {
- for (GridDhtTopologyFuture fut :
cctx.shared().exchange().exchangeFutures()) {
- if (fut.exchangeDone() &&
fut.topologyVersion().equals(topVer)) {
- Throwable err = null;
+ AffinityTopologyVersion lastChangeVer =
cctx.shared().exchange().lastAffinityChangedTopologyVersion(topVer);
- // Before cache validation, make sure that this topology
future is already completed.
- try {
- fut.get();
- }
- catch (IgniteCheckedException e) {
- err = fut.error();
- }
+ IgniteInternalFuture<AffinityTopologyVersion> affFut =
cctx.shared().exchange().affinityReadyFuture(lastChangeVer);
- if (err == null)
- err = fut.validateCache(cctx, recovery, read, null,
keys);
+ if (!affFut.isDone()) {
+ try {
+ affFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(err);
+
+ return;
+ }
+ }
+
+ for (GridDhtTopologyFuture fut :
cctx.shared().exchange().exchangeFutures()) {
+ if (fut.exchangeDone() &&
fut.topologyVersion().equals(lastChangeVer)) {
+ Throwable err = fut.validateCache(cctx, recovery, read,
null, keys);
if (err != null) {
onDone(err);
@@ -1667,10 +1674,6 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
if (res.compatibleRemapVersion()) {
if (tx != null) {
- // Versions are compatible.
- cctx.shared().exchange().
-
lastAffinityChangedTopologyVersion(res.clientRemapVersion(),
tx.topologyVersionSnapshot());
-
tx.onRemap(res.clientRemapVersion(), false);
// Use remapped version for all subsequent mappings.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 97535f5..5a4ee46 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1647,10 +1647,6 @@ public final class GridNearLockFuture extends
GridCacheCompoundIdentityFuture<Bo
if (res.compatibleRemapVersion()) {
if (tx != null) {
- // Versions are compatible.
- cctx.shared().exchange().
-
lastAffinityChangedTopologyVersion(res.clientRemapVersion(),
tx.topologyVersionSnapshot());
-
tx.onRemap(res.clientRemapVersion(), false);
// Use remapped version for all subsequent mappings.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
new file mode 100644
index 0000000..24555b5
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.function.Supplier;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ClientSlowDiscoveryAbstractTest extends GridCommonAbstractTest {
+ /** Cache name. */
+ protected static final String CACHE_NAME = "cache";
+
+ /** Cache configuration. */
+ private final CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setReadFromBackup(false)
+ .setBackups(1)
+ .setAffinity(new RendezvousAffinityFunction(false, 64));
+
+ /** Client mode. */
+ protected boolean clientMode;
+
+ /** Communication SPI supplier. */
+ protected Supplier<CommunicationSpi> communicationSpiSupplier =
TestRecordingCommunicationSpi::new;
+
+ /** Discovery SPI supplier. */
+ protected Supplier<DiscoverySpi> discoverySpiSupplier =
TcpDiscoverySpi::new;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+ cfg.setCacheConfiguration(ccfg);
+ cfg.setCommunicationSpi(communicationSpiSupplier.get());
+ cfg.setDiscoverySpi(discoverySpiSupplier.get());
+ cfg.setClientMode(clientMode);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ static class NodeJoinInterceptingDiscoverySpi extends TcpDiscoverySpi {
+ /** Interceptor. */
+ protected volatile IgniteInClosure<TcpDiscoveryNodeAddFinishedMessage>
interceptor;
+
+ /** {@inheritDoc} */
+ @Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryNodeAddFinishedMessage &&
interceptor != null)
+ interceptor.apply((TcpDiscoveryNodeAddFinishedMessage) msg);
+ }
+ }
+
+ /**
+ *
+ */
+ static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi
{
+ /** Interceptor. */
+ protected volatile IgniteInClosure<DiscoveryCustomMessage> interceptor;
+
+ /** {@inheritDoc} */
+ @Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (!(msg instanceof TcpDiscoveryCustomEventMessage))
+ return;
+
+ TcpDiscoveryCustomEventMessage cm =
(TcpDiscoveryCustomEventMessage)msg;
+
+ DiscoveryCustomMessage delegate;
+
+ try {
+ DiscoverySpiCustomMessage custMsg = cm.message(marshaller(),
+ U.resolveClassLoader(ignite().configuration()));
+
+ assertNotNull(custMsg);
+
+ delegate = ((CustomMessageWrapper)custMsg).delegate();
+ }
+ catch (Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+
+ if (interceptor != null)
+ interceptor.apply(delegate);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientDelayedJoinTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
similarity index 56%
rename from
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientDelayedJoinTest.java
rename to
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
index 06aab27..1a13b15 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientDelayedJoinTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTopologyChangeTest.java
@@ -19,74 +19,25 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
-import java.util.function.Supplier;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.spi.communication.CommunicationSpi;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
-import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
- * Tests for delayed client join.
- * When client node joins to cluster it sends SingleMessage to coordinator.
- * During this time topology on server nodes can be changed,
- * because client exchange doesn't require acknowledgement for SingleMessage
on coordinator.
- * Delay is simulated by blocking sending this SingleMessage and resume
sending after topology is changed.
+ *
*/
-public class ClientDelayedJoinTest extends GridCommonAbstractTest {
- /** Cache name. */
- private static final String CACHE_NAME = "cache";
-
- /** Cache configuration. */
- private final CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME)
- .setReadFromBackup(false)
- .setBackups(1)
- .setAffinity(new RendezvousAffinityFunction(false, 64));
-
- /** Client mode. */
- private boolean clientMode;
-
- /** Communication SPI supplier. */
- private Supplier<CommunicationSpi> communicationSpiSupplier =
TestRecordingCommunicationSpi::new;
-
- /** Discovery SPI supplier. */
- private Supplier<DiscoverySpi> discoverySpiSupplier =
CustomMessageInterceptingDiscoverySpi::new;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- cfg.setConsistentId(igniteInstanceName);
- cfg.setCacheConfiguration(ccfg);
- cfg.setCommunicationSpi(communicationSpiSupplier.get());
- cfg.setDiscoverySpi(discoverySpiSupplier.get());
- cfg.setClientMode(clientMode);
-
- return cfg;
- }
-
+public class ClientSlowDiscoveryTopologyChangeTest extends
ClientSlowDiscoveryAbstractTest {
/**
*
*/
@@ -108,7 +59,12 @@ public class ClientDelayedJoinTest extends
GridCommonAbstractTest {
}
/**
- *
+ * Test check that client join works well if cache configured on it
stopped on server nodes
+ * but discovery event about cache stop is not delivered to client node
immediately.
+ * When client node joins to cluster it sends SingleMessage to coordinator.
+ * During this time topology on server nodes can be changed,
+ * because client exchange doesn't require acknowledgement for
SingleMessage on coordinator.
+ * Delay is simulated by blocking sending this SingleMessage and resume
sending after topology is changed.
*/
@Test
public void testClientJoinAndCacheStop() throws Exception {
@@ -131,9 +87,9 @@ public class ClientDelayedJoinTest extends
GridCommonAbstractTest {
GridDhtPartitionsSingleMessage singleMsg =
(GridDhtPartitionsSingleMessage) msg;
return Optional.ofNullable(singleMsg.exchangeId())
- .map(GridDhtPartitionExchangeId::topologyVersion)
- .filter(topVer -> topVer.equals(new
AffinityTopologyVersion(4, 0)))
- .isPresent();
+ .map(GridDhtPartitionExchangeId::topologyVersion)
+ .filter(topVer -> topVer.equals(new AffinityTopologyVersion(4,
0)))
+ .isPresent();
});
communicationSpiSupplier = () -> clientCommSpi;
@@ -150,7 +106,7 @@ public class ClientDelayedJoinTest extends
GridCommonAbstractTest {
DynamicCacheChangeBatch cacheChangeBatch =
(DynamicCacheChangeBatch) msg;
boolean hasCacheStopReq = cacheChangeBatch.requests().stream()
- .anyMatch(req -> req.stop() &&
req.cacheName().equals(CACHE_NAME));
+ .anyMatch(req -> req.stop() &&
req.cacheName().equals(CACHE_NAME));
if (hasCacheStopReq)
U.awaitQuiet(clientDiscoSpiBlock);
@@ -185,7 +141,7 @@ public class ClientDelayedJoinTest extends
GridCommonAbstractTest {
}
catch (Exception e) {
assertTrue("Got unexpected exception during cache get " + e,
- X.hasCause(e, CacheStoppedException.class));
+ X.hasCause(e, CacheStoppedException.class));
}
finally {
// Resume processing cache destroy on client node.
@@ -195,7 +151,7 @@ public class ClientDelayedJoinTest extends
GridCommonAbstractTest {
// Wait till cache destroyed on client node.
GridTestUtils.waitForCondition(() -> {
AffinityTopologyVersion topVer =
client.context().cache().context().exchange().lastFinishedFuture()
- .topologyVersion();
+ .topologyVersion();
// Cache destroy version.
return topVer.equals(new AffinityTopologyVersion(4, 1));
@@ -203,37 +159,4 @@ public class ClientDelayedJoinTest extends
GridCommonAbstractTest {
Assert.assertNull("Cache should be destroyed on client node",
client.cache(CACHE_NAME));
}
-
- /**
- *
- */
- static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi
{
- /** Interceptor. */
- private volatile IgniteInClosure<DiscoveryCustomMessage> interceptor;
-
- /** {@inheritDoc} */
- @Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
- if (!(msg instanceof TcpDiscoveryCustomEventMessage))
- return;
-
- TcpDiscoveryCustomEventMessage cm =
(TcpDiscoveryCustomEventMessage)msg;
-
- DiscoveryCustomMessage delegate;
-
- try {
- DiscoverySpiCustomMessage custMsg = cm.message(marshaller(),
- U.resolveClassLoader(ignite().configuration()));
-
- assertNotNull(custMsg);
-
- delegate = ((CustomMessageWrapper)custMsg).delegate();
- }
- catch (Throwable throwable) {
- throw new RuntimeException(throwable);
- }
-
- if (interceptor != null)
- interceptor.apply(delegate);
- }
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
new file mode 100644
index 0000000..51a72bc
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryTransactionRemapTest.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Maps;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.internal.util.collections.Sets;
+
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Tests for client nodes with slow discovery.
+ */
+@RunWith(Parameterized.class)
+public class ClientSlowDiscoveryTransactionRemapTest extends
ClientSlowDiscoveryAbstractTest {
+ /** */
+ @Parameterized.Parameters(name = "isolation = {0}, concurrency = {1},
operation = {2}")
+ public static List<Object[]> parameters() {
+ ArrayList<Object[]> params = new ArrayList<>();
+
+ List<IgniteInClosure<TestTransaction<Integer, Integer>>> operations =
new ArrayList<>();
+
+ operations.add(new NamedClosure<>(putRemoveSameKey,
"putRemoveSameKey"));
+ operations.add(new NamedClosure<>(putRemoveDifferentKey,
"putRemoveDifferentKey"));
+ operations.add(new NamedClosure<>(getPutSameKey, "getPutSameKey"));
+ operations.add(new NamedClosure<>(getPutDifferentKey,
"getPutDifferentKey"));
+ operations.add(new NamedClosure<>(putAllRemoveAllSameKeys,
"putAllRemoveAllSameKeys"));
+ operations.add(new NamedClosure<>(putAllRemoveAllDifferentKeys,
"putAllRemoveAllDifferentKeys"));
+ operations.add(new NamedClosure<>(randomOperation, "random"));
+
+ for (TransactionConcurrency concurrency :
TransactionConcurrency.values()) {
+ for (TransactionIsolation isolation :
TransactionIsolation.values()) {
+ if (!shouldBeTested(concurrency, isolation))
+ continue;
+
+ for (IgniteInClosure<TestTransaction<Integer, Integer>>
operation : operations)
+ params.add(new Object[] {concurrency, isolation,
operation});
+ }
+ }
+
+ return params;
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @return {@code True} if pair concurrency - isolation should be tested.
+ */
+ private static boolean shouldBeTested(TransactionConcurrency concurrency,
TransactionIsolation isolation) {
+ if (concurrency == PESSIMISTIC)
+ return isolation == REPEATABLE_READ || isolation == READ_COMMITTED;
+ return concurrency == OPTIMISTIC && isolation == SERIALIZABLE;
+ }
+
+ /** Keys set. */
+ private static final int KEYS_SET = 64;
+
+ /** Put remove same key. */
+ private static IgniteInClosure<TestTransaction<Integer, Integer>>
putRemoveSameKey = tx -> {
+ tx.put(1, 1);
+ tx.remove(1);
+ tx.put(1, 100);
+ };
+
+ /** Put remove different key. */
+ private static IgniteInClosure<TestTransaction<Integer, Integer>>
putRemoveDifferentKey = tx -> {
+ tx.put(1, 1);
+ tx.remove(2);
+ };
+
+ /** Get put same key. */
+ private static IgniteInClosure<TestTransaction<Integer, Integer>>
getPutSameKey = tx -> {
+ int val = tx.get(1);
+ tx.put(1, val + 1);
+ };
+
+ /** Get put different key. */
+ private static IgniteInClosure<TestTransaction<Integer, Integer>>
getPutDifferentKey = tx -> {
+ int val = tx.get(1);
+ tx.put(2, val + 1);
+ };
+
+ /** Put all remove all same keys. */
+ private static IgniteInClosure<TestTransaction<Integer, Integer>>
putAllRemoveAllSameKeys = tx -> {
+ tx.putAll(Maps.asMap(Sets.newSet(1, 2, 3, 4, 5), k -> k));
+ tx.removeAll(Sets.newSet(1, 2, 3, 4, 5));
+ };
+
+ /** Put all remove all different keys. */
+ private static IgniteInClosure<TestTransaction<Integer, Integer>>
putAllRemoveAllDifferentKeys = tx -> {
+ tx.putAll(Maps.asMap(Sets.newSet(1, 2, 3, 4, 5), k -> k));
+ tx.removeAll(Sets.newSet(6, 7, 8, 9, 10));
+ };
+
+ /** Random operation. */
+ private static IgniteInClosure<TestTransaction<Integer, Integer>>
randomOperation = tx -> {
+ long seed = ThreadLocalRandom.current().nextLong();
+ log.info("Seed: " + seed);
+ Random random = new Random(seed);
+
+ for (int it = 0; it < 10; it++) {
+ int operation =
random.nextInt(TestTransaction.POSSIBLE_OPERATIONS);
+
+ switch (operation) {
+ // Get:
+ case 0: {
+ int key = random.nextInt(KEYS_SET);
+
+ tx.get(key);
+
+ break;
+ }
+ // Put:
+ case 1: {
+ int key = random.nextInt(KEYS_SET);
+ int val = random.nextInt(KEYS_SET);
+
+ tx.put(key, val);
+
+ break;
+ }
+ // Remove:
+ case 2: {
+ int key = random.nextInt(KEYS_SET);
+
+ tx.remove(key);
+
+ break;
+ }
+ // Put All:
+ case 3: {
+ tx.putAll(
+ random.ints(5, 0, KEYS_SET)
+ .boxed()
+ .distinct()
+ .collect(toMap(
+ k -> k, k -> k)
+ )
+ );
+
+ break;
+ }
+ // Remove All:
+ case 4: {
+ tx.removeAll(
+ random.ints(5, 0, KEYS_SET).boxed().collect(toSet())
+ );
+
+ break;
+ }
+ }
+ }
+ };
+
+ /**
+ * Interface to work with cache operations within transaction.
+ */
+ private static interface TestTransaction<K, V> {
+ /** Possible operations. */
+ static int POSSIBLE_OPERATIONS = 5;
+
+ /**
+ * @param key Key.
+ * @return Value.
+ */
+ V get(K key);
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ */
+ void put(K key, V val);
+
+ /**
+ * @param key Key.
+ */
+ void remove(K key);
+
+ /**
+ * @param map Map.
+ */
+ void putAll(Map<K, V> map);
+
+ /**
+ * @param keys Keys.
+ */
+ void removeAll(Set<K> keys);
+ }
+
+ /**
+ * Closure with possibility to set name to have proper print in test
parameters.
+ */
+ private static class NamedClosure<K, V> implements
IgniteInClosure<TestTransaction<K, V>> {
+ /** Closure. */
+ private final IgniteInClosure<TestTransaction<K, V>> c;
+
+ /** Name. */
+ private final String name;
+
+ /**
+ * @param c Closure.
+ * @param name Name.
+ */
+ public NamedClosure(IgniteInClosure<TestTransaction<K, V>> c, String
name) {
+ this.c = c;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(TestTransaction<K, V> kvTestTransaction) {
+ c.apply(kvTestTransaction);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return name;
+ }
+ }
+
+ /**
+ * Implementation for transaction operations backed by Ignite cache.
+ */
+ private static class TestTransactionEngine<K, V> implements
TestTransaction<K, V> {
+ /** Removed. */
+ private final Object RMV = new Object();
+
+ /** Cache. */
+ private final IgniteCache<K, V> cache;
+
+ /** Map to consistency check. */
+ private final Map<K, Object> map;
+
+ /**
+ * @param cache Cache.
+ */
+ TestTransactionEngine(IgniteCache<K, V> cache) {
+ this.cache = cache;
+ this.map = new HashMap<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override public V get(K key) {
+ return cache.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(K key, V val) {
+ map.put(key, val);
+ cache.put(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(K key) {
+ map.put(key, RMV);
+ cache.remove(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putAll(Map<K, V> map) {
+ for (Map.Entry<K, V> entry : map.entrySet())
+ this.map.put(entry.getKey(), entry.getValue());
+ cache.putAll(map);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll(Set<K> keys) {
+ for (K key : keys)
+ map.put(key, RMV);
+ cache.removeAll(keys);
+ }
+
+ /**
+ * Consistency check for transaction operations.
+ */
+ public void consistencyCheck() {
+ for (Map.Entry<K, Object> entry : map.entrySet()) {
+ if (entry.getValue() == RMV)
+ Assert.assertNull("Value is not null for key: " +
entry.getKey(), cache.get(entry.getKey()));
+ else
+ Assert.assertEquals("Values are different for key: " +
entry.getKey(),
+ entry.getValue(),
+ cache.get(entry.getKey())
+ );
+ }
+ }
+ }
+
+ /** Concurrency. */
+ @Parameterized.Parameter(0)
+ public TransactionConcurrency concurrency;
+
+ /** Isolation. */
+ @Parameterized.Parameter(1)
+ public TransactionIsolation isolation;
+
+ /** Operation. */
+ @Parameterized.Parameter(2)
+ public IgniteInClosure<TestTransaction<?, ?>> operation;
+
+ /** Client disco spi block. */
+ private CountDownLatch clientDiscoSpiBlock;
+
+ /** Client node to perform operations. */
+ private IgniteEx clnt;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Before
+ public void before() throws Exception {
+ clientMode = true;
+
+ NodeJoinInterceptingDiscoverySpi clientDiscoSpi = new
NodeJoinInterceptingDiscoverySpi();
+
+ clientDiscoSpiBlock = new CountDownLatch(1);
+
+ // Delay node join of second client.
+ clientDiscoSpi.interceptor = msg -> {
+ if (msg.nodeId().toString().endsWith("2"))
+ U.awaitQuiet(clientDiscoSpiBlock);
+ };
+
+ discoverySpiSupplier = () -> clientDiscoSpi;
+
+ clnt = startGrid(1);
+
+ for (int k = 0; k < 64; k++)
+ clnt.cache(CACHE_NAME).put(k, 0);
+
+ discoverySpiSupplier = TcpDiscoverySpi::new;
+
+ startGrid(2);
+ }
+
+ /** */
+ @After
+ public void after() throws Exception {
+ // Stop client nodes.
+ stopGrid(1);
+ stopGrid(2);
+ }
+
+ /** */
+ @Test
+ public void testTransactionRemap() throws Exception {
+ TestTransactionEngine engine = new
TestTransactionEngine<>(clnt.cache(CACHE_NAME));
+
+ IgniteInternalFuture<?> txFut = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = clnt.transactions().txStart(concurrency,
isolation)) {
+ operation.apply(engine);
+
+ tx.commit();
+ }
+ });
+
+ try {
+ txFut.get(1, TimeUnit.SECONDS);
+ }
+ catch (IgniteFutureTimeoutCheckedException te) {
+ // Expected.
+ }
+ finally {
+ clientDiscoSpiBlock.countDown();
+ }
+
+ // After resume second client join, transaction should succesfully
await new affinity and commit.
+ txFut.get();
+
+ // Check consistency after transaction commit.
+ engine.consistencyCheck();
+ }
+
+ /** */
+ @Test
+ public void testTransactionRemapWithTimeout() throws Exception {
+ TestTransactionEngine engine = new
TestTransactionEngine<>(clnt.cache(CACHE_NAME));
+
+ IgniteInternalFuture<?> txFut = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = clnt.transactions().txStart(concurrency,
isolation, 1_000, 1_000_000)) {
+ operation.apply(engine);
+
+ tx.commit();
+ }
+ });
+
+ try {
+ txFut.get(2, TimeUnit.SECONDS);
+ }
+ catch (IgniteFutureTimeoutCheckedException te) {
+ // Expected.
+ }
+ finally {
+ clientDiscoSpiBlock.countDown();
+ }
+
+ // After resume second client join, transaction should be timed out
and rolled back.
+ if (concurrency == PESSIMISTIC) {
+ assertThrowsWithCause((Callable<Object>) txFut::get,
TransactionTimeoutException.class);
+
+ // Check that initial data is not changed by rollbacked
transaction.
+ for (int k = 0; k < KEYS_SET; k++)
+ Assert.assertEquals("Cache consistency is broken for key: " +
k, 0, clnt.cache(CACHE_NAME).get(k));
+ }
+ else {
+ txFut.get();
+
+ engine.consistencyCheck();
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
index 73024f6..a22e341 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
@@ -17,12 +17,10 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
@@ -31,16 +29,19 @@ import
org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -130,7 +131,19 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends
GridCommonAbstractTest
}
/**
+ * Test scenario: cross-cache tx is started when node is left in the
middle of rebalance, first cache is rebalanced
+ * and second is partially rebalanced.
*
+ * First cache map request will trigger client compatible remap for
pessimistic txs,
+ * second cache map request should use new topology version.
+ *
+ * For optimistic tx remap is enforced if more than one mapping in
transaction or all enlisted caches have compatible
+ * assignments.
+ *
+ * Success: tx is finished on ideal topology version over all mapped nodes.
+ *
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
*/
private void doTestCrossCacheTxMapOnInvalidTopology(TransactionConcurrency
concurrency, TransactionIsolation isolation) throws Exception {
try {
@@ -139,7 +152,7 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends
GridCommonAbstractTest
awaitPartitionMapExchange();
- Ignite client = startGrid("client");
+ IgniteEx client = startGrid("client");
assertNotNull(client.cache(CACHE1));
assertNotNull(client.cache(CACHE2));
@@ -159,9 +172,12 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends
GridCommonAbstractTest
final AffinityTopologyVersion joinVer = new
AffinityTopologyVersion(4, 0);
AffinityTopologyVersion leftVer = new AffinityTopologyVersion(5,
0);
+ AffinityTopologyVersion idealVer = new AffinityTopologyVersion(5,
1);
AtomicReference<Set<Integer>> full = new AtomicReference<>();
+ GridConcurrentSkipListSet<Integer> leftVerParts = new
GridConcurrentSkipListSet<>();
+
crdSpi.blockMessages((node, m) -> {
if (m instanceof GridDhtPartitionSupplyMessage) {
GridDhtPartitionSupplyMessage msg =
(GridDhtPartitionSupplyMessage)m;
@@ -183,7 +199,20 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends
GridCommonAbstractTest
return true;
}
- if (msg.topologyVersion().equals(leftVer))
+ if (msg.topologyVersion().equals(leftVer)) {
+ Map<Integer, Long> last = U.field(msg, "last");
+
+ leftVerParts.addAll(last.keySet());
+
+ return true;
+ }
+ } else if (m instanceof GridDhtPartitionsFullMessage) {
+ GridDhtPartitionsFullMessage msg =
(GridDhtPartitionsFullMessage)m;
+
+ // Delay full message for ideal topology switch.
+ GridDhtPartitionExchangeId exchId = msg.exchangeId();
+
+ if (exchId != null &&
exchId.topologyVersion().equals(idealVer))
return true;
}
@@ -237,76 +266,104 @@ public class TxCrossCacheMapOnInvalidTopologyTest
extends GridCommonAbstractTest
grid(0).cachex(CACHE2).context().affinity().affinityReadyFuture(leftVer).get();
grid(2).cachex(CACHE2).context().affinity().affinityReadyFuture(leftVer).get();
+ AffinityAssignment assignment0 =
grid(0).cachex(CACHE1).context().affinity().assignment(leftVer);
AffinityAssignment assignment =
grid(0).cachex(CACHE2).context().affinity().assignment(leftVer);
// Search for a partition with incompatible assignment.
- int movingPart = -1;
+ int stablePart = -1; // Partition for cache1 which is mapped for
both late and ideal topologies to the same primary.
+ int movingPart = -1; // Partition for cache2 which is mapped for
both late and ideal topologies on different primaries.
- for (int p = 0; p < assignment.assignment().size(); p++) {
- List<ClusterNode> nodes = assignment.assignment().get(p);
- List<ClusterNode> nodes2 = assignment.idealAssignment().get(p);
+ for (int p = 0; p < assignment0.assignment().size(); p++) {
+ List<ClusterNode> curr = assignment.assignment().get(p);
+ List<ClusterNode> ideal = assignment.idealAssignment().get(p);
- if (!nodes.equals(nodes2) && nodes.get(0).order() == 1)
- movingPart = p;
+ if (curr.equals(ideal) && curr.get(0).order() == 1) {
+ stablePart = p;
+
+ break;
+ }
}
- assertFalse(movingPart == -1);
+ assertFalse(stablePart == -1);
- // Delay rebalance for next top ver.
- TestRecordingCommunicationSpi.spi(grid(2)).blockMessages((node,
message) -> {
- if (message instanceof GridDhtPartitionsSingleMessage) {
- GridDhtPartitionsSingleMessage sm =
(GridDhtPartitionsSingleMessage)message;
+ for (int p = 0; p < assignment.assignment().size(); p++) {
+ List<ClusterNode> curr = assignment.assignment().get(p);
+ List<ClusterNode> ideal = assignment.idealAssignment().get(p);
- return sm.exchangeId() != null;
+ if (!curr.equals(ideal) && curr.get(0).order() == 1) {
+ movingPart = p;
+
+ break;
}
+ }
- return false;
- });
+ assertFalse(movingPart == -1);
TestRecordingCommunicationSpi.spi(client).blockMessages(new
IgniteBiPredicate<ClusterNode, Message>() {
- @Override public boolean apply(ClusterNode node, Message
message) {
+ @Override public boolean apply(ClusterNode node, Message msg) {
if (concurrency == PESSIMISTIC)
- return message instanceof GridNearLockRequest;
+ return msg instanceof GridNearLockRequest;
else
- return message instanceof GridNearTxPrepareRequest;
+ return msg instanceof GridNearTxPrepareRequest;
}
});
+ final int finalStablePart = stablePart;
final int finalMovingPart = movingPart;
- IgniteInternalFuture<?> txFut = multithreadedAsync(new Runnable() {
- @Override public void run() {
- try (Transaction tx =
client.transactions().txStart(concurrency, isolation)) {
- Map<Integer, Integer> map = new LinkedHashMap<>();
- for (int p = 0; p < PARTS_CNT; p++)
- map.put(p, p);
+ IgniteInternalFuture<?> txFut = multithreadedAsync(() -> {
+ try (Transaction tx =
client.transactions().txStart(concurrency, isolation)) {
+ client.cache(CACHE1).put(finalStablePart, 0); // Will map
on crd(order=1).
- client.cache(CACHE1).putAll(map); // Will successfully
lock topology.
- client.cache(CACHE2).put(finalMovingPart, 0); //
Should remap but will go through without fix.
+ // Next request will remap to ideal topology, but it's not
ready on other node except crd.
+ client.cache(CACHE2).put(finalMovingPart, 0);
- tx.commit();
- }
+ tx.commit();
}
}, 1, "tx-thread");
+ // Wait until all missing supply messages are blocked.
+ assertTrue(GridTestUtils.waitForCondition(() ->
leftVerParts.size() == PARTS_CNT - full.get().size(), 5_000));
+
+ // Delay first lock request on late topology.
TestRecordingCommunicationSpi.spi(client).waitForBlocked();
- crdSpi.stopBlock(); // Continue rebalance and trigger next
topology switch.
+ // At this point only supply messages should be blocked.
+ // Unblock to continue rebalance and trigger ideal topology switch.
+ crdSpi.stopBlock(true, null, false, true);
+
+ // Wait until ideal topology is ready on crd.
+
crd.context().cache().context().exchange().affinityReadyFuture(idealVer).get(10_000);
- TestRecordingCommunicationSpi.spi(grid(2)).waitForBlocked();
+ // Other node must wait for full message.
+ assertFalse(GridTestUtils.waitForCondition(() ->
+
grid(2).context().cache().context().exchange().affinityReadyFuture(idealVer).isDone(),
1_000));
+ // Map on unstable topology (PME is in progress on other node).
TestRecordingCommunicationSpi.spi(client).stopBlock();
- doSleep(5000); // Make sure request will listen for current
topology future completion.
+ // Capture local transaction.
+ IgniteInternalTx tx0 =
client.context().cache().context().tm().activeTransactions().iterator().next();
- TestRecordingCommunicationSpi.spi(grid(2)).stopBlock();
+ // Expected behavior: tx must hang (both pessimistic and
optimistic) because topology is not ready.
+ try {
+ txFut.get(3_000);
- crdSpi.stopBlock();
+ fail("TX must not complete");
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ // Expected.
+ }
- awaitPartitionMapExchange();
+ crdSpi.stopBlock();
txFut.get();
+ // Check transaction map version. Should be mapped on ideal
topology.
+ assertEquals(tx0.topologyVersionSnapshot(), idealVer);
+
+ awaitPartitionMapExchange();
+
checkFutures();
}
finally {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 183f90a..5477a36 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -469,9 +469,7 @@ public class TxPartitionCounterStateConsistencyTest extends
TxPartitionCounterSt
*/
@Test
public void
testPartitionConsistencyDuringRebalanceAndConcurrentUpdates_NoOp() throws
Exception {
-
testPartitionConsistencyDuringRebalanceConcurrentlyWithTopologyChange(s -> {
- }, s -> {
- });
+
testPartitionConsistencyDuringRebalanceConcurrentlyWithTopologyChange(s -> {},
s -> {});
}
/**
@@ -629,9 +627,9 @@ public class TxPartitionCounterStateConsistencyTest extends
TxPartitionCounterSt
Integer key0 = primaryKey(grid(1).cache(DEFAULT_CACHE_NAME));
Integer key = primaryKey(grid(0).cache(DEFAULT_CACHE_NAME));
- TestRecordingCommunicationSpi spi =
TestRecordingCommunicationSpi.spi(crd);
+ TestRecordingCommunicationSpi crdSpi =
TestRecordingCommunicationSpi.spi(crd);
- spi.blockMessages((node, message) -> {
+ crdSpi.blockMessages((node, message) -> {
if (message instanceof GridDhtPartitionsFullMessage) {
GridDhtPartitionsFullMessage tmp =
(GridDhtPartitionsFullMessage)message;
@@ -644,11 +642,11 @@ public class TxPartitionCounterStateConsistencyTest
extends TxPartitionCounterSt
// Locks mapped wait.
CountDownLatch l = new CountDownLatch(1);
- IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+ IgniteInternalFuture startNodeFut = GridTestUtils.runAsync(() -> {
U.awaitQuiet(l);
try {
- startGrid(SERVER_NODES);
+ startGrid(SERVER_NODES); // Start node out of BLT.
}
catch (Exception e) {
fail(X.getFullStackTrace(e));
@@ -662,7 +660,7 @@ public class TxPartitionCounterStateConsistencyTest extends
TxPartitionCounterSt
});
IgniteInternalFuture txFut = GridTestUtils.runAsync(() -> {
- try(Transaction tx = client.transactions().txStart()) {
+ try (Transaction tx = client.transactions().txStart()) {
Map<Integer, Integer> map = new LinkedHashMap<>();
map.put(key, key); // clientFirst=true in lockAll.
@@ -680,7 +678,7 @@ public class TxPartitionCounterStateConsistencyTest extends
TxPartitionCounterSt
l.countDown();
- spi.waitForBlocked(); // Block PME after finish on crd and
wait on others.
+ crdSpi.waitForBlocked(); // Block PME after finish on crd and
wait on others.
cliSpi.stopBlock(); // Start remote lock mapping.
}
@@ -689,12 +687,10 @@ public class TxPartitionCounterStateConsistencyTest
extends TxPartitionCounterSt
}
});
- txFut.get(); // Transaction should not be blocked by concurrent PME.
lockFut.get();
-
- spi.stopBlock();
-
- fut.get();
+ crdSpi.stopBlock();
+ txFut.get();
+ startNodeFut.get();
awaitPartitionMapExchange();
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
index 5a6b244..bb77047 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
@@ -28,6 +28,7 @@ import
org.apache.ignite.cache.affinity.AffinityHistoryCleanupTest;
import org.apache.ignite.cache.affinity.local.LocalAffinityFunctionTest;
import
org.apache.ignite.internal.GridCachePartitionExchangeManagerHistSizeTest;
import
org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest;
+import
org.apache.ignite.internal.processors.cache.ClientSlowDiscoveryTransactionRemapTest;
import org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTest;
import
org.apache.ignite.internal.processors.cache.ClusterStatePartitionedSelfTest;
import
org.apache.ignite.internal.processors.cache.ClusterStateReplicatedSelfTest;
@@ -89,6 +90,7 @@ public class IgniteCacheMvccTestSuite5 {
ignoredTests.add(GridCachePartitionExchangeManagerHistSizeTest.class);
ignoredTests.add(ConcurrentCacheStartTest.class);
ignoredTests.add(IgniteCacheReadThroughEvictionsVariationsSuite.class);
+ ignoredTests.add(ClientSlowDiscoveryTransactionRemapTest.class);
return IgniteCacheTestSuite5.suite(ignoredTests);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 756a913..c1684b0 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -32,7 +32,8 @@ import
org.apache.ignite.internal.processors.cache.CacheKeepBinaryTransactionTes
import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest;
import org.apache.ignite.internal.processors.cache.CacheRebalancingSelfTest;
import
org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest;
-import org.apache.ignite.internal.processors.cache.ClientDelayedJoinTest;
+import
org.apache.ignite.internal.processors.cache.ClientSlowDiscoveryTopologyChangeTest;
+import
org.apache.ignite.internal.processors.cache.ClientSlowDiscoveryTransactionRemapTest;
import org.apache.ignite.internal.processors.cache.ClusterReadOnlyModeTest;
import
org.apache.ignite.internal.processors.cache.ClusterStatePartitionedSelfTest;
import
org.apache.ignite.internal.processors.cache.ClusterStateReplicatedSelfTest;
@@ -121,7 +122,8 @@ public class IgniteCacheTestSuite5 {
GridTestUtils.addTestIfNeeded(suite,
IgniteCacheReadThroughEvictionsVariationsSuite.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, ClientDelayedJoinTest.class,
ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
ClientSlowDiscoveryTopologyChangeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
ClientSlowDiscoveryTransactionRemapTest.class, ignoredTests);
//GridTestUtils.addTestIfNeeded(suite,
GridCacheAtomicPreloadSelfTest.class, ignoredTests);